Skip to content

Commit

Permalink
add ut and fix
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Jan 30, 2024
1 parent 8053595 commit 39746d2
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,11 @@ public class ResponseUtil {
public static boolean isCommitted(String msg) {
return COMMITTED_PATTERN.matcher(msg).find();
}

static final Pattern COPY_COMMITTED_PATTERN =
Pattern.compile("errCode = 2, detailMessage = No files can be copied.*");

public static boolean isCopyCommitted(String msg) {
return COPY_COMMITTED_PATTERN.matcher(msg).find();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.flink.util.Preconditions;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ public int getVersion() {
}

@Override
public byte[] serialize(DorisCopyCommittable selectdbCommittable) throws IOException {
public byte[] serialize(DorisCopyCommittable copyCommittable) throws IOException {
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {
out.writeUTF(selectdbCommittable.getHostPort());
out.writeUTF(copyCommittable.getHostPort());

// writeUTF has a length limit, but the copysql is sometimes very long
final byte[] copySqlBytes =
selectdbCommittable.getCopySQL().getBytes(StandardCharsets.UTF_8);
copyCommittable.getCopySQL().getBytes(StandardCharsets.UTF_8);
out.writeInt(copySqlBytes.length);
out.write(copySqlBytes);
out.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public String buildCopySQL() {
return sb.toString();
}

static List<String> PREFIX_LIST =
static final List<String> PREFIX_LIST =
Arrays.asList(FIELD_DELIMITER_KEY, LINE_DELIMITER_KEY, STRIP_OUT_ARRAY);

private String concatPropPrefix(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public int hashCode() {

@Override
public String toString() {
return "SelectdbCommittable{"
return "DorisCommittable{"
+ "hostPort='"
+ hostPort
+ '\''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public boolean handleCommitResponse(String loadResult) throws IOException {
Map<String, String> result = dataResp.getResult();
if (CollectionUtil.isNullOrEmpty(result)
|| (!result.get("state").equals("FINISHED")
&& !ResponseUtil.isCommitted(result.get("msg")))) {
&& !ResponseUtil.isCopyCommitted(result.get("msg")))) {
LOG.error("copy into load failed, reason:{}", loadResult);
return false;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.flink.sink.copy;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
Expand Down Expand Up @@ -48,6 +49,7 @@ public class DorisCopyWriter<IN>
implements DorisAbstractWriter<IN, DorisWriterState, DorisCopyCommittable> {

private static final Logger LOG = LoggerFactory.getLogger(DorisCopyWriter.class);
private final long lastCheckpointId;
private BatchStageLoad batchStageLoad;
private final DorisOptions dorisOptions;
private final DorisReadOptions dorisReadOptions;
Expand Down Expand Up @@ -75,6 +77,11 @@ public DorisCopyWriter(
this.database = tableInfo[0];
this.table = tableInfo[1];
}
this.lastCheckpointId =
initContext
.getRestoredCheckpointId()
.orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
LOG.info("restore checkpointId {}", lastCheckpointId);
LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
this.labelPrefix =
executionOptions.getLabelPrefix()
Expand All @@ -97,6 +104,7 @@ public void initializeLoad() {
this.batchStageLoad =
new BatchStageLoad(
dorisOptions, dorisReadOptions, executionOptions, labelGenerator);
this.batchStageLoad.setCurrentCheckpointID(lastCheckpointId + 1);
// when uploading data in streaming mode, we need to regularly detect whether there are
// exceptions.
scheduledExecutorService.scheduleWithFixedDelay(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink.copy;

import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.CopyLoadException;
import org.apache.doris.flink.sink.HttpEntityMock;
import org.apache.doris.flink.sink.OptionUtils;
import org.apache.doris.flink.sink.committer.MockCommitRequest;
import org.apache.http.ProtocolVersion;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.message.BasicStatusLine;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.Collections;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/** Test for Doris Copy Committer. */
public class TestDorisCopyCommitter {

DorisCopyCommitter copyCommitter;
DorisCopyCommittable copyCommittable;
HttpEntityMock entityMock;

@Before
public void setUp() throws Exception {
DorisOptions dorisOptions = OptionUtils.buildDorisOptions();
copyCommittable = new DorisCopyCommittable("127.0.0.1:8710", "copy into sql");
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
entityMock = new HttpEntityMock();
CloseableHttpResponse httpResponse = mock(CloseableHttpResponse.class);
StatusLine normalLine = new BasicStatusLine(new ProtocolVersion("http", 1, 0), 200, "");
when(httpClient.execute(any())).thenReturn(httpResponse);
when(httpResponse.getStatusLine()).thenReturn(normalLine);
when(httpResponse.getEntity()).thenReturn(entityMock);
copyCommitter = new DorisCopyCommitter(dorisOptions, 1, httpClient);
}

@Test
public void testCommitted() throws Exception {
String response =
"{\"msg\":\"success\",\"code\":0,\"data\":{\"result\":{\"msg\":\"\",\"loadedRows\":\"2\",\"state\":\"FINISHED\",\"type\":\"\",\"filterRows\":\"0\",\"unselectRows\":\"0\",\"url\":null},\"time\":5230,\"type\":\"result_set\"},\"count\":0}";
this.entityMock.setValue(response);
final MockCommitRequest<DorisCopyCommittable> request =
new MockCommitRequest<>(copyCommittable);
copyCommitter.commit(Collections.singletonList(request));
}

@Test(expected = CopyLoadException.class)
public void testCommitedError() throws Exception {
String response =
"{\"msg\":\"success\",\"code\":0,\"data\":{\"result\":{\"msg\":\"errCode = 2, detailMessage = No source file in this table(table).\",\"loadedRows\":\"\",\"state\":\"CANCELLED\",\"type\":\"ETL_RUN_FAIL\",\"filterRows\":\"\",\"unselectRows\":\"\",\"url\":null},\"time\":5255,\"type\":\"result_set\"},\"count\":0}";
this.entityMock.setValue(response);
final MockCommitRequest<DorisCopyCommittable> request =
new MockCommitRequest<>(copyCommittable);
copyCommitter.commit(Collections.singletonList(request));
}

@Test
public void testHandleCommitResponse() throws Exception {
String loadResult =
"{\"msg\":\"Error\",\"code\":1,\"data\":\"Failed to execute sql: java.lang.ClassCastException: java.util.LinkedHashMap$Entry cannot be cast to java.util.HashMap$TreeNode\",\"count\":0}";
Assert.assertFalse(copyCommitter.handleCommitResponse(loadResult));

loadResult =
"{\"msg\":\"success\",\"code\":0,\"data\":{\"result\":{\"msg\":\"errCode = 2, detailMessage = , host: 10.62.1.219\",\"loadedRows\":\"\",\"id\":\"88a895b14bf84184-9a061fd09f125b10\",\"state\":\"CANCELLED\",\"type\":\"LOAD_RUN_FAIL\",\"filterRows\":\"\",\"unselectRows\":\"\",\"url\":null},\"time\":6098,\"type\":\"result_set\"},\"count\":0} ";
Assert.assertFalse(copyCommitter.handleCommitResponse(loadResult));

loadResult =
"{\"msg\":\"success\","
+ "\"code\":0,\"data\":{\"result\":{\"msg\":\"errCode = 2, detailMessage = There is no scanNode Backend available.[2305966: not alive]\",\"loadedRows\":\"\",\"id\":\"c301fd3c88f946f1-98d05"
+ "4ddae4106ae\",\"state\":\"CANCELLED\",\"type\":\"LOAD_RUN_FAIL\",\"filterRows\":\"\",\"unselectRows\":\"\",\"url\":null},\"time\":10092,\"type\":\"result_set\"},\"count\":0} ";
Assert.assertFalse(copyCommitter.handleCommitResponse(loadResult));

loadResult =
"{\"msg\":\"Error\",\"code\":1,\"data\":\"Failed to execute sql: java.sql.SQLException: (conn=44217) Exception, msg: Node catalog is not ready, please wait for a while.\",\"count\":0}";
Assert.assertFalse(copyCommitter.handleCommitResponse(loadResult));

loadResult =
"{\"msg\":\"success\",\"code\":0,\"data\":{\"result\":{\"msg\":\"\",\"loadedRows\":\"2399\",\"id\":\"31734d4274964740-ac2c022b6dfbf658\",\"state\":\"FINISHED\",\"type\":\"\",\"filterRows\":\"0\",\"unselectRows\":\"0\",\"url\":null},\"time\":54974,\"type\":\"result_set\"},\"count\":0}";
Assert.assertTrue(copyCommitter.handleCommitResponse(loadResult));

loadResult =
"{\"msg\":\"success\",\"code\":0,\"data\":{\"result\":{\"msg\":\"errCode = 2, detailMessage = No files can be copied, matched 1 files, filtered 1 files because files may be loading or loaded\",\"loadedRows\":\"\",\"id\":\"b86cb31213014886-91bc97e8df01676f\",\"state\":\"CANCELLED\",\"type\":\"ETL_RUN_FAIL\",\"filterRows\":\"\",\"unselectRows\":\"\",\"url\":null},\"time\":5019,\"type\":\"result_set\"},\"count\":0}";
Assert.assertTrue(copyCommitter.handleCommitResponse(loadResult));
}
}

0 comments on commit 39746d2

Please sign in to comment.