Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Aug 9, 2024
1 parent 0f5b84d commit b1cde58
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public class BatchBufferStream extends InputStream {
private final Iterator<byte[]> iterator;
private byte[] currentItem;
private byte[] currentRow;
private int currentPos;

public BatchBufferStream(List<byte[]> buffer) {
Expand All @@ -43,11 +43,11 @@ public int read(byte[] buf) throws IOException {

@Override
public int read(byte[] buf, int off, int len) throws IOException {
if (!iterator.hasNext() && currentItem == null) {
if (!iterator.hasNext() && currentRow == null) {
return -1;
}

byte[] item = currentItem;
byte[] item = currentRow;
int pos = currentPos;
int readBytes = 0;
while (readBytes < len && (item != null || iterator.hasNext())) {
Expand All @@ -66,8 +66,7 @@ public int read(byte[] buf, int off, int len) throws IOException {
pos = 0;
}
}

currentItem = item;
currentRow = item;
currentPos = pos;
return readBytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ public void insert(byte[] record) {
loadBatchFirstRecord = false;
} else if (lineDelimiter != null) {
this.buffer.add(this.lineDelimiter);
setBufferSizeBytes(this.bufferSizeBytes + this.lineDelimiter.length);
}
this.buffer.add(record);
setNumOfRecords(getNumOfRecords() + 1);
setBufferSizeBytes(getBufferSizeBytes() + record.length);
setNumOfRecords(this.numOfRecords + 1);
setBufferSizeBytes(this.bufferSizeBytes + record.length);
}

public String getLabelName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ public DorisBatchStreamLoad(
* @param record
* @throws IOException
*/
public synchronized void writeRecord(String database, String table, byte[] record)
throws InterruptedException {
public synchronized void writeRecord(String database, String table, byte[] record) {
checkFlushException();
String bufferKey = getTableIdentifier(database, table);
BatchRecordBuffer buffer =
Expand All @@ -184,15 +183,15 @@ public synchronized void writeRecord(String database, String table, byte[] recor
}
}

public synchronized boolean bufferFullFlush(String bufferKey) throws InterruptedException {
public synchronized boolean bufferFullFlush(String bufferKey) {
return doFlush(bufferKey, false, true);
}

public synchronized boolean intervalFlush() throws InterruptedException {
public synchronized boolean intervalFlush() {
return doFlush(null, false, false);
}

public synchronized boolean checkpointFlush() throws InterruptedException {
public synchronized boolean checkpointFlush() {
return doFlush(null, true, false);
}

Expand Down Expand Up @@ -336,6 +335,7 @@ private void mergeBuffer(List<BatchRecordBuffer> recordList, BatchRecordBuffer b
buffer.getBuffer().addAll(recordBuffer.getBuffer());
}
}
LOG.info("merge {} buffer to one stream load", recordList.size());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink.batch;

import org.junit.Assert;
import org.junit.Test;

import java.io.ByteArrayOutputStream;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class TestBatchBufferHttpEntity {

@Test
public void testWrite() throws Exception {
BatchRecordBuffer recordBuffer = TestBatchBufferStream.mockBuffer();
byte[] expectedData = TestBatchBufferStream.mergeByteArrays(recordBuffer.getBuffer());
Assert.assertEquals(recordBuffer.getNumOfRecords(), 1000);

BatchBufferHttpEntity entity = new BatchBufferHttpEntity(recordBuffer);
assertTrue(entity.isRepeatable());
assertFalse(entity.isStreaming());
assertEquals(entity.getContentLength(), expectedData.length);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
entity.writeTo(outputStream);
assertArrayEquals(expectedData, outputStream.toByteArray());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink.batch;

import org.junit.Assert;
import org.junit.Test;

import java.util.List;
import java.util.Random;
import java.util.UUID;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class TestBatchBufferStream {

@Test
public void testRead() throws Exception {
BatchRecordBuffer recordBuffer = mockBuffer();
byte[] expectedData = mergeByteArrays(recordBuffer.getBuffer());
Assert.assertEquals(recordBuffer.getNumOfRecords(), 1000);

byte[] actualData = new byte[(int) recordBuffer.getBufferSizeBytes()];
try (BatchBufferStream inputStream = new BatchBufferStream(recordBuffer.getBuffer())) {
int len = inputStream.read(actualData, 0, actualData.length);
assertEquals(actualData.length, len);
assertArrayEquals(expectedData, actualData);
}
}

@Test
public void testReadBufLen() throws Exception {
BatchRecordBuffer recordBuffer = mockBuffer();
byte[] expectedData = mergeByteArrays(recordBuffer.getBuffer());
Assert.assertEquals(recordBuffer.getNumOfRecords(), 1000);

byte[] actualData = new byte[(int) recordBuffer.getBufferSizeBytes()];
try (BatchBufferStream inputStream = new BatchBufferStream(recordBuffer.getBuffer())) {
int pos = 0;
while (pos < actualData.length) {
// mock random length
int maxLen = new Random().nextInt(actualData.length - pos) + 1;
int len = inputStream.read(actualData, pos, maxLen);
if (len == -1) {
break;
}
assertTrue(len > 0 && len <= maxLen);
pos += len;
}
assertEquals(actualData.length, pos);
assertArrayEquals(expectedData, actualData);
}
}

public static BatchRecordBuffer mockBuffer() {
BatchRecordBuffer recordBuffer = new BatchRecordBuffer();
for (int i = 0; i < 1000; i++) {
recordBuffer.insert((UUID.randomUUID() + "," + i).getBytes());
}
return recordBuffer;
}

public static byte[] mergeByteArrays(List<byte[]> listOfByteArrays) {
int totalLength = 0;
for (byte[] byteArray : listOfByteArrays) {
totalLength += byteArray.length;
}

byte[] mergedArray = new byte[totalLength];

int currentPosition = 0;
for (byte[] byteArray : listOfByteArrays) {
System.arraycopy(byteArray, 0, mergedArray, currentPosition, byteArray.length);
currentPosition += byteArray.length;
}

return mergedArray;
}
}

0 comments on commit b1cde58

Please sign in to comment.