Skip to content

Commit

Permalink
add copy mode
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Jan 30, 2024
1 parent 9d7626f commit 8053595
Show file tree
Hide file tree
Showing 18 changed files with 1,355 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.exception;

public class CopyLoadException extends DorisRuntimeException {
public CopyLoadException() {
super();
}

public CopyLoadException(String message) {
super(message);
}

public CopyLoadException(String message, Throwable cause) {
super(message, cause);
}

public CopyLoadException(Throwable cause) {
super(cause);
}

protected CopyLoadException(
String message,
Throwable cause,
boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink;

public interface DorisAbstractCommittable {}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.Objects;

/** DorisCommittable hold the info for Committer to commit. */
public class DorisCommittable {
public class DorisCommittable implements DorisAbstractCommittable {
private final String hostPort;
private final String db;
private final long txnID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.batch.DorisBatchWriter;
import org.apache.doris.flink.sink.committer.DorisCommitter;
import org.apache.doris.flink.sink.copy.CopyCommittableSerializer;
import org.apache.doris.flink.sink.copy.DorisCopyCommitter;
import org.apache.doris.flink.sink.copy.DorisCopyWriter;
import org.apache.doris.flink.sink.writer.DorisAbstractWriter;
import org.apache.doris.flink.sink.writer.DorisWriter;
import org.apache.doris.flink.sink.writer.DorisWriterState;
Expand All @@ -49,8 +52,7 @@
*/
public class DorisSink<IN>
implements StatefulSink<IN, DorisWriterState>,
TwoPhaseCommittingSink<IN, DorisCommittable> {

TwoPhaseCommittingSink<IN, DorisAbstractCommittable> {
private static final Logger LOG = LoggerFactory.getLogger(DorisSink.class);
private final DorisOptions dorisOptions;
private final DorisReadOptions dorisReadOptions;
Expand Down Expand Up @@ -84,9 +86,16 @@ public DorisAbstractWriter createWriter(InitContext initContext) throws IOExcept
}

@Override
public Committer<DorisCommittable> createCommitter() throws IOException {
return new DorisCommitter(
dorisOptions, dorisReadOptions, dorisExecutionOptions.getMaxRetries());
public Committer createCommitter() throws IOException {
if (WriteMode.STREAM_LOAD.equals(dorisExecutionOptions.getWriteMode())
|| WriteMode.STREAM_LOAD_BATCH.equals(dorisExecutionOptions.getWriteMode())) {
return new DorisCommitter(
dorisOptions, dorisReadOptions, dorisExecutionOptions.getMaxRetries());
} else if (WriteMode.COPY.equals(dorisExecutionOptions.getWriteMode())) {
return new DorisCopyCommitter(dorisOptions, dorisExecutionOptions.getMaxRetries());
}
throw new IllegalArgumentException(
"Unsupported write mode " + dorisExecutionOptions.getWriteMode());
}

@Override
Expand All @@ -109,6 +118,9 @@ public DorisAbstractWriter getDorisAbstractWriter(
} else if (WriteMode.STREAM_LOAD_BATCH.equals(dorisExecutionOptions.getWriteMode())) {
return new DorisBatchWriter<>(
initContext, serializer, dorisOptions, dorisReadOptions, dorisExecutionOptions);
} else if (WriteMode.COPY.equals(dorisExecutionOptions.getWriteMode())) {
return new DorisCopyWriter(
initContext, serializer, dorisOptions, dorisReadOptions, dorisExecutionOptions);
}
throw new IllegalArgumentException(
"Unsupported write mode " + dorisExecutionOptions.getWriteMode());
Expand All @@ -120,8 +132,15 @@ public SimpleVersionedSerializer<DorisWriterState> getWriterStateSerializer() {
}

@Override
public SimpleVersionedSerializer<DorisCommittable> getCommittableSerializer() {
return new DorisCommittableSerializer();
public SimpleVersionedSerializer getCommittableSerializer() {
if (WriteMode.STREAM_LOAD.equals(dorisExecutionOptions.getWriteMode())
|| WriteMode.STREAM_LOAD_BATCH.equals(dorisExecutionOptions.getWriteMode())) {
return new DorisCommittableSerializer();
} else if (WriteMode.COPY.equals(dorisExecutionOptions.getWriteMode())) {
return new CopyCommittableSerializer();
}
throw new IllegalArgumentException(
"Unsupported write mode " + dorisExecutionOptions.getWriteMode());
}

public static <IN> Builder<IN> builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public HttpPutBuilder addHiddenColumns(boolean add) {
return this;
}

public HttpPutBuilder addFileName(String fileName) {
header.put("fileName", fileName);
return this;
}

public HttpPutBuilder enable2PC() {
header.put("two_phase_commit", "true");
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.flink.sink;

import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.NoConnectionReuseStrategy;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
Expand All @@ -37,4 +39,22 @@ protected boolean isRedirectable(String method) {
public CloseableHttpClient getHttpClient() {
return httpClientBuilder.build();
}

private RequestConfig requestConfig =
RequestConfig.custom()
.setConnectTimeout(60 * 1000)
.setConnectionRequestTimeout(60 * 1000)
// default checkpoint timeout is 10min
.setSocketTimeout(9 * 60 * 1000)
.build();

private final HttpClientBuilder httpClientBuilderWithTimeout =
HttpClients.custom().setDefaultRequestConfig(requestConfig);

public CloseableHttpClient getHttpClientWithTimeout() {
return httpClientBuilderWithTimeout
// fix failed to respond for commit copy
.setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink.copy;

import org.apache.doris.flink.exception.DorisRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BackoffAndRetryUtils {

private static final Logger LOG = LoggerFactory.getLogger(BackoffAndRetryUtils.class);

// backoff with 1, 2, 4 seconds
private static final int[] backoffSec = {0, 1, 2, 4};

/** Interfaces to define the lambda function to be used by backoffAndRetry. */
public interface BackoffFunction {
Object apply() throws Exception;
}

public static Object backoffAndRetry(
final LoadOperation operation, final BackoffFunction runnable) throws Exception {
String error = "";
Throwable resExp = null;
for (int index = 0; index < backoffSec.length; index++) {
if (index != 0) {
int second = backoffSec[index];
Thread.sleep(second * 1000L);
LOG.info("Retry operation {} {} times", operation, index);
}
try {
return runnable.apply();
} catch (Exception e) {
resExp = e;
error = e.getMessage();
LOG.error(
"Request failed, caught an exception for operation {} with message:{}",
operation,
e.getMessage());
}
}
String errMsg =
String.format(
"Retry exceeded the max retry limit, operation = %s, error message is %s",
operation, error);
LOG.error(errMsg, resExp);
throw new DorisRuntimeException(errMsg);
}

public enum LoadOperation {
GET_INTERNAL_STAGE_ADDRESS,
UPLOAD_FILE,
}
}
Loading

0 comments on commit 8053595

Please sign in to comment.