Skip to content

Commit

Permalink
[fix] Fixed the issue that batchwriter may be blocked when writing to…
Browse files Browse the repository at this point in the history
… multiple tables (apache#511)
  • Loading branch information
JNSimba authored Nov 20, 2024
1 parent 7285212 commit ee94f55
Showing 1 changed file with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public DorisBatchStreamLoad(
* @param record
* @throws IOException
*/
public synchronized void writeRecord(String database, String table, byte[] record) {
public void writeRecord(String database, String table, byte[] record) {
checkFlushException();
String bufferKey = getTableIdentifier(database, table);

Expand Down Expand Up @@ -228,15 +228,15 @@ public synchronized void writeRecord(String database, String table, byte[] recor
}
}

public synchronized boolean bufferFullFlush(String bufferKey) {
public boolean bufferFullFlush(String bufferKey) {
return doFlush(bufferKey, false, true);
}

public synchronized boolean intervalFlush() {
public boolean intervalFlush() {
return doFlush(null, false, false);
}

public synchronized boolean checkpointFlush() {
public boolean checkpointFlush() {
return doFlush(null, true, false);
}

Expand All @@ -254,6 +254,10 @@ private synchronized boolean doFlush(
}

private synchronized boolean flush(String bufferKey, boolean waitUtilDone) {
if (bufferMap.isEmpty()) {
// bufferMap may have been flushed by other threads
return false;
}
if (null == bufferKey) {
boolean flush = false;
for (String key : bufferMap.keySet()) {
Expand All @@ -270,7 +274,7 @@ private synchronized boolean flush(String bufferKey, boolean waitUtilDone) {
} else if (bufferMap.containsKey(bufferKey)) {
flushBuffer(bufferKey);
} else {
throw new DorisBatchLoadException("buffer not found for key: " + bufferKey);
LOG.warn("buffer not found for key: {}, may be already flushed.", bufferKey);
}
if (waitUtilDone) {
waitAsyncLoadFinish();
Expand All @@ -281,6 +285,7 @@ private synchronized boolean flush(String bufferKey, boolean waitUtilDone) {
private synchronized void flushBuffer(String bufferKey) {
BatchRecordBuffer buffer = bufferMap.get(bufferKey);
buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable()));
LOG.debug("flush buffer for key {} with label {}", bufferKey, buffer.getLabelName());
putRecordToFlushQueue(buffer);
bufferMap.remove(bufferKey);
}
Expand Down Expand Up @@ -408,11 +413,6 @@ public void run() {
load(bf.getLabelName(), bf);
}
}

if (flushQueue.size() < flushQueueSize) {
// Avoid waiting for 2 rounds of intervalMs
doFlush(null, false, false);
}
} catch (Exception e) {
LOG.error("worker running error", e);
exception.set(e);
Expand Down

0 comments on commit ee94f55

Please sign in to comment.