From 374711e3bbe6fafbbc41b115eefd74d45d244f26 Mon Sep 17 00:00:00 2001 From: chenzhuoyu Date: Sun, 18 Feb 2024 16:21:49 +0800 Subject: [PATCH] fix review --- .../paimon/{ => gateway}/StreamLoadServer.java | 11 ++++------- .../{ => gateway}/config/NettyServerConfig.java | 2 +- .../exception/RemoteException.java | 2 +- .../{ => gateway}/handler/LoadHttpHandler.java | 17 +++++++++++++---- .../reader/ExcelWriteStrategy.java | 2 +- .../{ => gateway}/reader/WriteStrategy.java | 2 +- .../utils/HttpServerResponseUtil.java | 2 +- .../utils/RequestParameterUtil.java | 2 +- 8 files changed, 23 insertions(+), 17 deletions(-) rename paimon-service/paimon-service-load/src/main/java/org/apache/paimon/{ => gateway}/StreamLoadServer.java (96%) rename paimon-service/paimon-service-load/src/main/java/org/apache/paimon/{ => gateway}/config/NettyServerConfig.java (99%) rename paimon-service/paimon-service-load/src/main/java/org/apache/paimon/{ => gateway}/exception/RemoteException.java (95%) rename paimon-service/paimon-service-load/src/main/java/org/apache/paimon/{ => gateway}/handler/LoadHttpHandler.java (89%) rename paimon-service/paimon-service-load/src/main/java/org/apache/paimon/{ => gateway}/reader/ExcelWriteStrategy.java (96%) rename paimon-service/paimon-service-load/src/main/java/org/apache/paimon/{ => gateway}/reader/WriteStrategy.java (96%) rename paimon-service/paimon-service-load/src/main/java/org/apache/paimon/{ => gateway}/utils/HttpServerResponseUtil.java (98%) rename paimon-service/paimon-service-load/src/main/java/org/apache/paimon/{ => gateway}/utils/RequestParameterUtil.java (97%) diff --git a/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/StreamLoadServer.java b/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/StreamLoadServer.java similarity index 96% rename from paimon-service/paimon-service-load/src/main/java/org/apache/paimon/StreamLoadServer.java rename to paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/StreamLoadServer.java index 280b1b9a08dc..645440175ef0 100644 --- a/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/StreamLoadServer.java +++ b/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/StreamLoadServer.java @@ -16,15 +16,15 @@ * limitations under the License. */ -package org.apache.paimon; +package org.apache.paimon.gateway; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.config.NettyServerConfig; -import org.apache.paimon.exception.RemoteException; +import org.apache.paimon.gateway.config.NettyServerConfig; +import org.apache.paimon.gateway.exception.RemoteException; import org.apache.paimon.options.Options; -import org.apache.paimon.handler.LoadHttpHandler; +import org.apache.paimon.gateway.handler.LoadHttpHandler; import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -40,17 +40,14 @@ import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContentDecompressor; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec; import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; -import org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll.isAvailable; diff --git a/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/config/NettyServerConfig.java b/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/config/NettyServerConfig.java similarity index 99% rename from paimon-service/paimon-service-load/src/main/java/org/apache/paimon/config/NettyServerConfig.java rename to paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/config/NettyServerConfig.java index a78a766e210f..337443b2bd92 100644 --- a/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/config/NettyServerConfig.java +++ b/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/config/NettyServerConfig.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.paimon.config; +package org.apache.paimon.gateway.config; /** dfdf. */ public class NettyServerConfig { diff --git a/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/exception/RemoteException.java b/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/exception/RemoteException.java similarity index 95% rename from paimon-service/paimon-service-load/src/main/java/org/apache/paimon/exception/RemoteException.java rename to paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/exception/RemoteException.java index 911f43b93091..0fa0ac9389f1 100644 --- a/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/exception/RemoteException.java +++ b/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/exception/RemoteException.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.paimon.exception; +package org.apache.paimon.gateway.exception; /** Custom runtime exception. */ public class RemoteException extends RuntimeException { diff --git a/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/handler/LoadHttpHandler.java b/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/handler/LoadHttpHandler.java similarity index 89% rename from paimon-service/paimon-service-load/src/main/java/org/apache/paimon/handler/LoadHttpHandler.java rename to paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/handler/LoadHttpHandler.java index 06b16305314c..176e9cef7f00 100644 --- a/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/handler/LoadHttpHandler.java +++ b/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/handler/LoadHttpHandler.java @@ -16,12 +16,16 @@ * limitations under the License. */ -package org.apache.paimon.handler; +package org.apache.paimon.gateway.handler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.reader.ExcelWriteStrategy; -import org.apache.paimon.reader.WriteStrategy; +import org.apache.paimon.gateway.reader.ExcelWriteStrategy; +import org.apache.paimon.gateway.reader.WriteStrategy; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchTableCommit; @@ -49,7 +53,7 @@ import java.util.List; import java.util.Optional; -import static org.apache.paimon.utils.HttpServerResponseUtil.response; +import static org.apache.paimon.gateway.utils.HttpServerResponseUtil.response; /** An HTTP handler for loading data into a catalog, handling various HTTP events and requests. */ @ChannelHandler.Sharable @@ -174,6 +178,11 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex + " \"CommitAndPublishTimeMs\": 106\n" + "}"); } + if (msg instanceof LastHttpContent) { + // 所有的数据块都已经接收完毕 + ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)); + // 这里可以清理资源,比如关闭文件输出流等 + } } private static Optional extractDbAndTable(String path) { diff --git a/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/reader/ExcelWriteStrategy.java b/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/reader/ExcelWriteStrategy.java similarity index 96% rename from paimon-service/paimon-service-load/src/main/java/org/apache/paimon/reader/ExcelWriteStrategy.java rename to paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/reader/ExcelWriteStrategy.java index 007496087481..892959d1135b 100644 --- a/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/reader/ExcelWriteStrategy.java +++ b/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/reader/ExcelWriteStrategy.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.paimon.reader; +package org.apache.paimon.gateway.reader; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.sink.BatchTableWrite; diff --git a/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/reader/WriteStrategy.java b/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/reader/WriteStrategy.java similarity index 96% rename from paimon-service/paimon-service-load/src/main/java/org/apache/paimon/reader/WriteStrategy.java rename to paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/reader/WriteStrategy.java index a8ea26c48c0d..462ca4c821c3 100644 --- a/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/reader/WriteStrategy.java +++ b/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/reader/WriteStrategy.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.paimon.reader; +package org.apache.paimon.gateway.reader; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.sink.BatchTableWrite; diff --git a/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/utils/HttpServerResponseUtil.java b/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/utils/HttpServerResponseUtil.java similarity index 98% rename from paimon-service/paimon-service-load/src/main/java/org/apache/paimon/utils/HttpServerResponseUtil.java rename to paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/utils/HttpServerResponseUtil.java index d4bb077d0486..9de6c3eb4453 100644 --- a/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/utils/HttpServerResponseUtil.java +++ b/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/utils/HttpServerResponseUtil.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.utils; +package org.apache.paimon.gateway.utils; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; diff --git a/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/utils/RequestParameterUtil.java b/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/utils/RequestParameterUtil.java similarity index 97% rename from paimon-service/paimon-service-load/src/main/java/org/apache/paimon/utils/RequestParameterUtil.java rename to paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/utils/RequestParameterUtil.java index ffeefba7dfeb..3583ab34903e 100644 --- a/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/utils/RequestParameterUtil.java +++ b/paimon-service/paimon-service-load/src/main/java/org/apache/paimon/gateway/utils/RequestParameterUtil.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.utils; +package org.apache.paimon.gateway.utils; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.QueryStringDecoder;