Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
jerry-024 committed Jan 6, 2025
1 parent 90c9144 commit f9d9803
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.rest.MockRESTMessage;
import org.apache.paimon.rest.RESTCatalogFactory;
import org.apache.paimon.rest.RESTCatalogInternalOptions;
import org.apache.paimon.rest.RESTCatalogOptions;
import org.apache.paimon.rest.RESTObjectMapper;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
Expand All @@ -33,8 +32,6 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.Catalog;
Expand Down Expand Up @@ -66,7 +63,7 @@
public class FlinkRESTCatalogTest {
private final ObjectMapper mapper = RESTObjectMapper.create();
private final ObjectPath path1 = new ObjectPath("db1", "t1");
private MockWebServer mockWebServer;
private MockRESTCatalogServer mockRESTCatalogServer;
private String serverUrl;
private String warehouse;
private Catalog catalog;
Expand All @@ -75,9 +72,9 @@ public class FlinkRESTCatalogTest {

@Before
public void beforeEach() throws IOException {
mockWebServer = new MockWebServer();
mockWebServer.start();
serverUrl = mockWebServer.url("").toString();
mockRESTCatalogServer = new MockRESTCatalogServer(warehouse);
mockRESTCatalogServer.start();
serverUrl = mockRESTCatalogServer.getUrl();
Options options = new Options();
options.set(RESTCatalogOptions.URI, serverUrl);
String initToken = "init_token";
Expand All @@ -86,7 +83,6 @@ public void beforeEach() throws IOException {
warehouse = new File(temporaryFolder.newFolder(), UUID.randomUUID().toString()).toString();
options.set(LOG_SYSTEM_AUTO_REGISTER, true);
options.set(CatalogOptions.METASTORE, RESTCatalogFactory.IDENTIFIER);
mockConfig(warehouse);
GetDatabaseResponse response =
MockRESTMessage.getDatabaseResponse(
org.apache.paimon.catalog.Catalog.DEFAULT_DATABASE);
Expand All @@ -100,7 +96,7 @@ public void beforeEach() throws IOException {

@After
public void tearDown() throws IOException {
mockWebServer.shutdown();
mockRESTCatalogServer.shutdown();
}

@Test
Expand Down Expand Up @@ -150,20 +146,4 @@ private ResolvedSchema createSchema() {
Collections.emptyList(),
null);
}

private void mockConfig(String warehouseStr) {
String mockResponse =
String.format(
"{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\"}}",
RESTCatalogInternalOptions.PREFIX.key(),
"prefix",
CatalogOptions.WAREHOUSE.key(),
warehouseStr);
mockResponse(mockResponse, 200);
}

private void mockResponse(String mockContent, int httpCode) {
MockResponse mockResponse = MockRESTMessage.mockResponse(mockContent, httpCode);
mockWebServer.enqueue(mockResponse);
}
}
Original file line number Diff line number Diff line change
@@ -1,46 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink;

import java.io.IOException;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.rest.RESTCatalogInternalOptions;

import okhttp3.mockwebserver.Dispatcher;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;

import java.io.IOException;

import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;

/** Mock REST server for testing. */
public class MockRESTCatalogServer {
public static void main(String[] args) {
MockWebServer server = new MockWebServer();
final Dispatcher dispatcher = new Dispatcher() {

private final Catalog catalog;
private final Dispatcher dispatcher;
private final MockWebServer server;

public MockRESTCatalogServer(String warehouse) {
Options conf = new Options();
conf.setString("warehouse", warehouse);
conf.set(LOG_SYSTEM_AUTO_REGISTER, true);
this.catalog =
CatalogFactory.createCatalog(
CatalogContext.create(conf), this.getClass().getClassLoader());
this.dispatcher = initDispatcher();
MockWebServer mockWebServer = new MockWebServer();
mockWebServer.setDispatcher(dispatcher);
server = mockWebServer;
}

public void start() throws IOException {
server.start();
}

public String getUrl() {
return server.url("").toString();
}

public void shutdown() throws IOException {
server.shutdown();
}

public static Dispatcher initDispatcher() {
return new Dispatcher() {
@Override
public MockResponse dispatch(RecordedRequest request) throws InterruptedException {

switch (request.getPath()) {
case "/v1/login/auth/":
return new MockResponse().setResponseCode(200);
case "/v1/check/version/":
case "/v1/config":
return new MockResponse()
.setResponseCode(200)
.setBody(getConfigBody("/tmp/1"));
case "/v1/prefix/databases/":
return new MockResponse().setResponseCode(200).setBody("version=9");
case "/v1/profile/info":
return new MockResponse().setResponseCode(200).setBody("profile");
}
return new MockResponse().setResponseCode(404);
}
};
server.setDispatcher(dispatcher);
try {
server.start(8099);
String serverUrl = server.url("").toString();
} catch (IOException e) {
e.printStackTrace();
System.exit(0);
}
while (true) {
try {
Thread.sleep(1000);
RecordedRequest request = server.takeRequest();
System.out.println("Request: " + request.getPath());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

private static String getConfigBody(String warehouseStr) {
return String.format(
"{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\"}}",
RESTCatalogInternalOptions.PREFIX.key(),
"prefix",
CatalogOptions.WAREHOUSE.key(),
warehouseStr);
}
}

0 comments on commit f9d9803

Please sign in to comment.