Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Aug 12, 2024
1 parent af5d301 commit 336f865
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class DorisBatchStreamLoad implements Serializable {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final List<String> DORIS_SUCCESS_STATUS =
new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
private static final long STREAM_LOAD_MAX_BYTES = 10 * 1024 * 1024 * 1024L; //10 GB
private final LabelGenerator labelGenerator;
private final byte[] lineDelimiter;
private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
Expand All @@ -106,9 +107,7 @@ public class DorisBatchStreamLoad implements Serializable {
private boolean enableGzCompress;
private int subTaskId;
private long streamLoadMaxRows = Integer.MAX_VALUE;
// todo:Can be configured
private long streamLoadMaxBytes = 1024 * 1024 * 1024;
private long maxBlockedBytes = 200 * 1024 * 1024;
private long maxBlockedBytes;
private final AtomicLong currentCacheBytes = new AtomicLong(0L);
private final Lock lock = new ReentrantLock();
private final Condition block = lock.newCondition();
Expand Down Expand Up @@ -146,6 +145,8 @@ public DorisBatchStreamLoad(
this.enableGzCompress = loadProps.getProperty(COMPRESS_TYPE, "").equals(COMPRESS_TYPE_GZ);
this.executionOptions = executionOptions;
this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
// maxBlockedBytes ensures that a buffer can be written even if the queue is full
this.maxBlockedBytes = (long) executionOptions.getBufferFlushMaxBytes() * (executionOptions.getFlushQueueSize() + 1);
if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
Preconditions.checkState(
Expand Down Expand Up @@ -216,7 +217,7 @@ public synchronized void writeRecord(String database, String table, byte[] recor
LOG.info("trigger flush by buffer full, flush: {}", flush);

} else if (buffer.getBufferSizeBytes() >= streamLoadMaxRows
|| buffer.getNumOfRecords() >= streamLoadMaxBytes) {
|| buffer.getNumOfRecords() >= STREAM_LOAD_MAX_BYTES) {
// The buffer capacity exceeds the stream load limit, flush
boolean flush = bufferFullFlush(bufferKey);
LOG.info("trigger flush by buffer exceeding the limit, flush: {}", flush);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public void testEquals() {
.setWriteMode(WriteMode.STREAM_LOAD)
.setLabelPrefix("doris")
.enable2PC()
.setBufferFlushMaxBytes(524288001)
.setBufferFlushMaxBytes(10)
.setBufferFlushIntervalMs(10000)
.setBufferFlushMaxRows(500001)
.setBufferFlushMaxRows(12)
.setCheckInterval(10)
.setIgnoreCommitError(true)
.setDeletable(true)
Expand All @@ -72,9 +72,9 @@ public void testEquals() {
.setWriteMode(WriteMode.STREAM_LOAD)
.setLabelPrefix("doris")
.enable2PC()
.setBufferFlushMaxBytes(524288001)
.setBufferFlushMaxBytes(10)
.setBufferFlushIntervalMs(10000)
.setBufferFlushMaxRows(500001)
.setBufferFlushMaxRows(12)
.setCheckInterval(10)
.setIgnoreCommitError(true)
.setDeletable(true)
Expand Down Expand Up @@ -111,17 +111,17 @@ public void testEquals() {
Assert.assertNotEquals(exceptOptions, builder.build());
builder.enable2PC();

builder.setBufferFlushMaxBytes(524288002);
builder.setBufferFlushMaxBytes(11);
Assert.assertNotEquals(exceptOptions, builder.build());
builder.setBufferFlushMaxBytes(524288001);
builder.setBufferFlushMaxBytes(10);

builder.setBufferFlushIntervalMs(100001);
Assert.assertNotEquals(exceptOptions, builder.build());
builder.setBufferFlushIntervalMs(10000);

builder.setBufferFlushMaxRows(500002);
builder.setBufferFlushMaxRows(2);
Assert.assertNotEquals(exceptOptions, builder.build());
builder.setBufferFlushMaxRows(500001);
builder.setBufferFlushMaxRows(12);

builder.setCheckInterval(11);
Assert.assertNotEquals(exceptOptions, builder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ public void testTableBatch() throws Exception {
+ " 'sink.enable.batch-mode' = 'true',"
+ " 'sink.enable-delete' = 'true',"
+ " 'sink.flush.queue-size' = '2',"
+ " 'sink.buffer-flush.max-rows' = '100000',"
+ " 'sink.buffer-flush.max-bytes' = '50MB',"
+ " 'sink.buffer-flush.max-rows' = '1',"
+ " 'sink.buffer-flush.max-bytes' = '5',"
+ " 'sink.buffer-flush.interval' = '10s'"
+ ")",
getFenodes(),
Expand Down Expand Up @@ -295,8 +295,8 @@ public void testTableGroupCommit() throws Exception {
+ " 'sink.enable.batch-mode' = 'true',"
+ " 'sink.enable-delete' = 'true',"
+ " 'sink.flush.queue-size' = '2',"
+ " 'sink.buffer-flush.max-rows' = '300000',"
+ " 'sink.buffer-flush.max-bytes' = '50MB',"
+ " 'sink.buffer-flush.max-rows' = '3',"
+ " 'sink.buffer-flush.max-bytes' = '5000',"
+ " 'sink.buffer-flush.interval' = '10s'"
+ ")",
getFenodes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ public void testDorisSinkProperties() {
properties.put("sink.use-cache", "true");
properties.put("sink.enable.batch-mode", "true");
properties.put("sink.flush.queue-size", "2");
properties.put("sink.buffer-flush.max-rows", "50000");
properties.put("sink.buffer-flush.max-bytes", "50MB");
properties.put("sink.buffer-flush.max-rows", "1000");
properties.put("sink.buffer-flush.max-bytes", "10MB");
properties.put("sink.buffer-flush.interval", "10s");
properties.put("sink.ignore.update-before", "true");
properties.put("sink.ignore.commit-error", "false");
Expand Down Expand Up @@ -165,8 +165,8 @@ public void testDorisSinkProperties() {
.setBatchMode(true)
.enable2PC()
.setBufferFlushIntervalMs(10000)
.setBufferFlushMaxBytes(50 * 1024 * 1024)
.setBufferFlushMaxRows(50000)
.setBufferFlushMaxBytes(10 * 1024 * 1024)
.setBufferFlushMaxRows(1000)
.setFlushQueueSize(2)
.setUseCache(true)
.setIgnoreCommitError(false)
Expand Down

0 comments on commit 336f865

Please sign in to comment.