From b1cde58e3fa637785791dbeb066ee9cd195d78af Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Fri, 9 Aug 2024 15:15:03 +0800 Subject: [PATCH] fix --- .../flink/sink/batch/BatchBufferStream.java | 9 +- .../flink/sink/batch/BatchRecordBuffer.java | 5 +- .../sink/batch/DorisBatchStreamLoad.java | 10 +- .../sink/batch/TestBatchBufferHttpEntity.java | 46 +++++++++ .../sink/batch/TestBatchBufferStream.java | 95 +++++++++++++++++++ 5 files changed, 153 insertions(+), 12 deletions(-) create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferHttpEntity.java create mode 100644 flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferStream.java diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferStream.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferStream.java index e0d4dc374..a782bb53f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferStream.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferStream.java @@ -24,7 +24,7 @@ public class BatchBufferStream extends InputStream { private final Iterator iterator; - private byte[] currentItem; + private byte[] currentRow; private int currentPos; public BatchBufferStream(List buffer) { @@ -43,11 +43,11 @@ public int read(byte[] buf) throws IOException { @Override public int read(byte[] buf, int off, int len) throws IOException { - if (!iterator.hasNext() && currentItem == null) { + if (!iterator.hasNext() && currentRow == null) { return -1; } - byte[] item = currentItem; + byte[] item = currentRow; int pos = currentPos; int readBytes = 0; while (readBytes < len && (item != null || iterator.hasNext())) { @@ -66,8 +66,7 @@ public int read(byte[] buf, int off, int len) throws IOException { pos = 0; } } - - currentItem = item; + currentRow = item; currentPos = pos; return readBytes; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java index 54b4b2abc..0e38f2728 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java @@ -55,10 +55,11 @@ public void insert(byte[] record) { loadBatchFirstRecord = false; } else if (lineDelimiter != null) { this.buffer.add(this.lineDelimiter); + setBufferSizeBytes(this.bufferSizeBytes + this.lineDelimiter.length); } this.buffer.add(record); - setNumOfRecords(getNumOfRecords() + 1); - setBufferSizeBytes(getBufferSizeBytes() + record.length); + setNumOfRecords(this.numOfRecords + 1); + setBufferSizeBytes(this.bufferSizeBytes + record.length); } public String getLabelName() { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index c98ace43f..e457c56c6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -163,8 +163,7 @@ public DorisBatchStreamLoad( * @param record * @throws IOException */ - public synchronized void writeRecord(String database, String table, byte[] record) - throws InterruptedException { + public synchronized void writeRecord(String database, String table, byte[] record) { checkFlushException(); String bufferKey = getTableIdentifier(database, table); BatchRecordBuffer buffer = @@ -184,15 +183,15 @@ public synchronized void writeRecord(String database, String table, byte[] recor } } - public synchronized boolean bufferFullFlush(String bufferKey) throws InterruptedException { + public synchronized boolean bufferFullFlush(String bufferKey) { return doFlush(bufferKey, false, true); } - public synchronized boolean intervalFlush() throws InterruptedException { + public synchronized boolean intervalFlush() { return doFlush(null, false, false); } - public synchronized boolean checkpointFlush() throws InterruptedException { + public synchronized boolean checkpointFlush() { return doFlush(null, true, false); } @@ -336,6 +335,7 @@ private void mergeBuffer(List recordList, BatchRecordBuffer b buffer.getBuffer().addAll(recordBuffer.getBuffer()); } } + LOG.info("merge {} buffer to one stream load", recordList.size()); } } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferHttpEntity.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferHttpEntity.java new file mode 100644 index 000000000..fe20c5442 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferHttpEntity.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.batch; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestBatchBufferHttpEntity { + + @Test + public void testWrite() throws Exception { + BatchRecordBuffer recordBuffer = TestBatchBufferStream.mockBuffer(); + byte[] expectedData = TestBatchBufferStream.mergeByteArrays(recordBuffer.getBuffer()); + Assert.assertEquals(recordBuffer.getNumOfRecords(), 1000); + + BatchBufferHttpEntity entity = new BatchBufferHttpEntity(recordBuffer); + assertTrue(entity.isRepeatable()); + assertFalse(entity.isStreaming()); + assertEquals(entity.getContentLength(), expectedData.length); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + entity.writeTo(outputStream); + assertArrayEquals(expectedData, outputStream.toByteArray()); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferStream.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferStream.java new file mode 100644 index 000000000..3dad5b60a --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferStream.java @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.batch; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Random; +import java.util.UUID; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestBatchBufferStream { + + @Test + public void testRead() throws Exception { + BatchRecordBuffer recordBuffer = mockBuffer(); + byte[] expectedData = mergeByteArrays(recordBuffer.getBuffer()); + Assert.assertEquals(recordBuffer.getNumOfRecords(), 1000); + + byte[] actualData = new byte[(int) recordBuffer.getBufferSizeBytes()]; + try (BatchBufferStream inputStream = new BatchBufferStream(recordBuffer.getBuffer())) { + int len = inputStream.read(actualData, 0, actualData.length); + assertEquals(actualData.length, len); + assertArrayEquals(expectedData, actualData); + } + } + + @Test + public void testReadBufLen() throws Exception { + BatchRecordBuffer recordBuffer = mockBuffer(); + byte[] expectedData = mergeByteArrays(recordBuffer.getBuffer()); + Assert.assertEquals(recordBuffer.getNumOfRecords(), 1000); + + byte[] actualData = new byte[(int) recordBuffer.getBufferSizeBytes()]; + try (BatchBufferStream inputStream = new BatchBufferStream(recordBuffer.getBuffer())) { + int pos = 0; + while (pos < actualData.length) { + // mock random length + int maxLen = new Random().nextInt(actualData.length - pos) + 1; + int len = inputStream.read(actualData, pos, maxLen); + if (len == -1) { + break; + } + assertTrue(len > 0 && len <= maxLen); + pos += len; + } + assertEquals(actualData.length, pos); + assertArrayEquals(expectedData, actualData); + } + } + + public static BatchRecordBuffer mockBuffer() { + BatchRecordBuffer recordBuffer = new BatchRecordBuffer(); + for (int i = 0; i < 1000; i++) { + recordBuffer.insert((UUID.randomUUID() + "," + i).getBytes()); + } + return recordBuffer; + } + + public static byte[] mergeByteArrays(List listOfByteArrays) { + int totalLength = 0; + for (byte[] byteArray : listOfByteArrays) { + totalLength += byteArray.length; + } + + byte[] mergedArray = new byte[totalLength]; + + int currentPosition = 0; + for (byte[] byteArray : listOfByteArrays) { + System.arraycopy(byteArray, 0, mergedArray, currentPosition, byteArray.length); + currentPosition += byteArray.length; + } + + return mergedArray; + } +}