diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/CopyLoadException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/CopyLoadException.java new file mode 100644 index 000000000..d8b80d94f --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/CopyLoadException.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.exception; + +public class CopyLoadException extends DorisRuntimeException { + public CopyLoadException() { + super(); + } + + public CopyLoadException(String message) { + super(message); + } + + public CopyLoadException(String message, Throwable cause) { + super(message, cause); + } + + public CopyLoadException(Throwable cause) { + super(cause); + } + + protected CopyLoadException( + String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisAbstractCommittable.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisAbstractCommittable.java new file mode 100644 index 000000000..7e1c7e4ee --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisAbstractCommittable.java @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink; + +public interface DorisAbstractCommittable {} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisCommittable.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisCommittable.java index ab6ee1f6d..78560fda5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisCommittable.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisCommittable.java @@ -20,7 +20,7 @@ import java.util.Objects; /** DorisCommittable hold the info for Committer to commit. */ -public class DorisCommittable { +public class DorisCommittable implements DorisAbstractCommittable { private final String hostPort; private final String db; private final long txnID; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java index 6a3477651..e9ba2ebdd 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java @@ -29,6 +29,9 @@ import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.sink.batch.DorisBatchWriter; import org.apache.doris.flink.sink.committer.DorisCommitter; +import org.apache.doris.flink.sink.copy.CopyCommittableSerializer; +import org.apache.doris.flink.sink.copy.DorisCopyCommitter; +import org.apache.doris.flink.sink.copy.DorisCopyWriter; import org.apache.doris.flink.sink.writer.DorisAbstractWriter; import org.apache.doris.flink.sink.writer.DorisWriter; import org.apache.doris.flink.sink.writer.DorisWriterState; @@ -49,8 +52,7 @@ */ public class DorisSink implements StatefulSink, - TwoPhaseCommittingSink { - + TwoPhaseCommittingSink { private static final Logger LOG = LoggerFactory.getLogger(DorisSink.class); private final DorisOptions dorisOptions; private final DorisReadOptions dorisReadOptions; @@ -84,9 +86,16 @@ public DorisAbstractWriter createWriter(InitContext initContext) throws IOExcept } @Override - public Committer createCommitter() throws IOException { - return new DorisCommitter( - dorisOptions, dorisReadOptions, dorisExecutionOptions.getMaxRetries()); + public Committer createCommitter() throws IOException { + if (WriteMode.STREAM_LOAD.equals(dorisExecutionOptions.getWriteMode()) + || WriteMode.STREAM_LOAD_BATCH.equals(dorisExecutionOptions.getWriteMode())) { + return new DorisCommitter( + dorisOptions, dorisReadOptions, dorisExecutionOptions.getMaxRetries()); + } else if (WriteMode.COPY.equals(dorisExecutionOptions.getWriteMode())) { + return new DorisCopyCommitter(dorisOptions, dorisExecutionOptions.getMaxRetries()); + } + throw new IllegalArgumentException( + "Unsupported write mode " + dorisExecutionOptions.getWriteMode()); } @Override @@ -109,6 +118,9 @@ public DorisAbstractWriter getDorisAbstractWriter( } else if (WriteMode.STREAM_LOAD_BATCH.equals(dorisExecutionOptions.getWriteMode())) { return new DorisBatchWriter<>( initContext, serializer, dorisOptions, dorisReadOptions, dorisExecutionOptions); + } else if (WriteMode.COPY.equals(dorisExecutionOptions.getWriteMode())) { + return new DorisCopyWriter( + initContext, serializer, dorisOptions, dorisReadOptions, dorisExecutionOptions); } throw new IllegalArgumentException( "Unsupported write mode " + dorisExecutionOptions.getWriteMode()); @@ -120,8 +132,15 @@ public SimpleVersionedSerializer getWriterStateSerializer() { } @Override - public SimpleVersionedSerializer getCommittableSerializer() { - return new DorisCommittableSerializer(); + public SimpleVersionedSerializer getCommittableSerializer() { + if (WriteMode.STREAM_LOAD.equals(dorisExecutionOptions.getWriteMode()) + || WriteMode.STREAM_LOAD_BATCH.equals(dorisExecutionOptions.getWriteMode())) { + return new DorisCommittableSerializer(); + } else if (WriteMode.COPY.equals(dorisExecutionOptions.getWriteMode())) { + return new CopyCommittableSerializer(); + } + throw new IllegalArgumentException( + "Unsupported write mode " + dorisExecutionOptions.getWriteMode()); } public static Builder builder() { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java index a833e2ef9..023cd31a4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java @@ -58,6 +58,11 @@ public HttpPutBuilder addHiddenColumns(boolean add) { return this; } + public HttpPutBuilder addFileName(String fileName) { + header.put("fileName", fileName); + return this; + } + public HttpPutBuilder enable2PC() { header.put("two_phase_commit", "true"); return this; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java index 7ad87f692..d081c40de 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpUtil.java @@ -17,6 +17,8 @@ package org.apache.doris.flink.sink; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.impl.NoConnectionReuseStrategy; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultRedirectStrategy; import org.apache.http.impl.client.HttpClientBuilder; @@ -37,4 +39,22 @@ protected boolean isRedirectable(String method) { public CloseableHttpClient getHttpClient() { return httpClientBuilder.build(); } + + private RequestConfig requestConfig = + RequestConfig.custom() + .setConnectTimeout(60 * 1000) + .setConnectionRequestTimeout(60 * 1000) + // default checkpoint timeout is 10min + .setSocketTimeout(9 * 60 * 1000) + .build(); + + private final HttpClientBuilder httpClientBuilderWithTimeout = + HttpClients.custom().setDefaultRequestConfig(requestConfig); + + public CloseableHttpClient getHttpClientWithTimeout() { + return httpClientBuilderWithTimeout + // fix failed to respond for commit copy + .setConnectionReuseStrategy(NoConnectionReuseStrategy.INSTANCE) + .build(); + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BackoffAndRetryUtils.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BackoffAndRetryUtils.java new file mode 100644 index 000000000..4a9389c21 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BackoffAndRetryUtils.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.copy; + +import org.apache.doris.flink.exception.DorisRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BackoffAndRetryUtils { + + private static final Logger LOG = LoggerFactory.getLogger(BackoffAndRetryUtils.class); + + // backoff with 1, 2, 4 seconds + private static final int[] backoffSec = {0, 1, 2, 4}; + + /** Interfaces to define the lambda function to be used by backoffAndRetry. */ + public interface BackoffFunction { + Object apply() throws Exception; + } + + public static Object backoffAndRetry( + final LoadOperation operation, final BackoffFunction runnable) throws Exception { + String error = ""; + Throwable resExp = null; + for (int index = 0; index < backoffSec.length; index++) { + if (index != 0) { + int second = backoffSec[index]; + Thread.sleep(second * 1000L); + LOG.info("Retry operation {} {} times", operation, index); + } + try { + return runnable.apply(); + } catch (Exception e) { + resExp = e; + error = e.getMessage(); + LOG.error( + "Request failed, caught an exception for operation {} with message:{}", + operation, + e.getMessage()); + } + } + String errMsg = + String.format( + "Retry exceeded the max retry limit, operation = %s, error message is %s", + operation, error); + LOG.error(errMsg, resExp); + throw new DorisRuntimeException(errMsg); + } + + public enum LoadOperation { + GET_INTERNAL_STAGE_ADDRESS, + UPLOAD_FILE, + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java new file mode 100644 index 000000000..4bbbfa5b4 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java @@ -0,0 +1,414 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.copy; + +import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.exception.CopyLoadException; +import org.apache.doris.flink.exception.DorisBatchLoadException; +import org.apache.doris.flink.sink.EscapeHandler; +import org.apache.doris.flink.sink.HttpPutBuilder; +import org.apache.doris.flink.sink.HttpUtil; +import org.apache.doris.flink.sink.batch.BatchRecordBuffer; +import org.apache.doris.flink.sink.writer.LabelGenerator; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; + +/** load data from stage load. */ +public class BatchStageLoad implements Serializable { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(BatchStageLoad.class); + private final LabelGenerator labelGenerator; + private final byte[] lineDelimiter; + private static final String UPLOAD_URL_PATTERN = "http://%s/copy/upload"; + private static final String LINE_DELIMITER_KEY_WITH_PRETIX = "file.line_delimiter"; + private String uploadUrl; + private String hostPort; + private final String username; + private final String password; + private final Properties loadProps; + private Map bufferMap = new ConcurrentHashMap<>(); + private Map> fileListMap = new ConcurrentHashMap<>(); + private long currentCheckpointID; + private AtomicInteger fileNum; + private DorisExecutionOptions executionOptions; + private ExecutorService loadExecutorService; + private StageLoadAsyncExecutor loadAsyncExecutor; + private BlockingQueue flushQueue; + private final AtomicBoolean started; + private volatile boolean loadThreadAlive = false; + private AtomicReference exception = new AtomicReference<>(null); + private CloseableHttpClient httpClient = new HttpUtil().getHttpClientWithTimeout(); + + public BatchStageLoad( + DorisOptions dorisOptions, + DorisReadOptions dorisReadOptions, + DorisExecutionOptions executionOptions, + LabelGenerator labelGenerator) { + this.username = dorisOptions.getUsername(); + this.password = dorisOptions.getPassword(); + this.loadProps = executionOptions.getStreamLoadProp(); + this.labelGenerator = labelGenerator; + this.hostPort = dorisOptions.getFenodes(); + this.uploadUrl = String.format(UPLOAD_URL_PATTERN, hostPort); + this.fileNum = new AtomicInteger(); + this.lineDelimiter = + EscapeHandler.escapeString( + loadProps.getProperty( + LINE_DELIMITER_KEY_WITH_PRETIX, LINE_DELIMITER_DEFAULT)) + .getBytes(); + this.executionOptions = executionOptions; + this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize()); + this.loadAsyncExecutor = new StageLoadAsyncExecutor(); + this.loadExecutorService = + new ThreadPoolExecutor( + 1, + 1, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(1), + new DefaultThreadFactory("copy-executor"), + new ThreadPoolExecutor.AbortPolicy()); + this.started = new AtomicBoolean(true); + this.loadExecutorService.execute(loadAsyncExecutor); + } + + /** + * write record into cache. + * + * @param record + * @throws IOException + */ + public synchronized void writeRecord(String database, String table, byte[] record) + throws InterruptedException { + checkFlushException(); + String bufferKey = getTableIdentifier(database, table); + BatchRecordBuffer buffer = + bufferMap.computeIfAbsent( + bufferKey, + k -> + new BatchRecordBuffer( + database, + table, + this.lineDelimiter, + executionOptions.getBufferFlushMaxBytes())); + buffer.insert(record); + // When it exceeds 80% of the byteSize,to flush, to avoid triggering bytebuffer expansion + if (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes() * 0.8 + || (executionOptions.getBufferFlushMaxRows() != 0 + && buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) { + flush(bufferKey, false); + } + } + + public synchronized void flush(String bufferKey, boolean waitUtilDone) + throws InterruptedException { + checkFlushException(); + if (null == bufferKey) { + for (String key : bufferMap.keySet()) { + flushBuffer(key); + } + } else if (bufferMap.containsKey(bufferKey)) { + flushBuffer(bufferKey); + } + + if (waitUtilDone) { + waitAsyncLoadFinish(); + } + } + + private synchronized void flushBuffer(String bufferKey) { + BatchRecordBuffer buffer = bufferMap.get(bufferKey); + buffer.setLabelName( + labelGenerator.generateCopyBatchLabel( + buffer.getTable(), currentCheckpointID, fileNum.getAndIncrement())); + putRecordToFlushQueue(buffer); + bufferMap.remove(bufferKey); + } + + private void putRecordToFlushQueue(BatchRecordBuffer buffer) { + checkFlushException(); + if (!loadThreadAlive) { + throw new RuntimeException("load thread already exit, write was interrupted"); + } + try { + flushQueue.put(buffer); + } catch (InterruptedException e) { + throw new RuntimeException("Failed to put record buffer to flush queue"); + } + } + + public void setCurrentCheckpointID(long currentCheckpointID) { + this.currentCheckpointID = currentCheckpointID; + } + + private void checkFlushException() { + if (exception.get() != null) { + throw new DorisBatchLoadException(exception.get()); + } + } + + private void waitAsyncLoadFinish() { + for (int i = 0; i < executionOptions.getFlushQueueSize() + 1; i++) { + BatchRecordBuffer empty = new BatchRecordBuffer(); + putRecordToFlushQueue(empty); + } + } + + private String getTableIdentifier(String database, String table) { + return database + "." + table; + } + + public void close() { + // close async executor + this.loadExecutorService.shutdown(); + this.started.set(false); + // clear buffer + this.flushQueue.clear(); + this.fileListMap.clear(); + this.bufferMap.clear(); + } + + public Map> getFileListMap() { + return fileListMap; + } + + public void clearFileListMap() { + this.fileNum.set(0); + this.fileListMap.clear(); + } + + class StageLoadAsyncExecutor implements Runnable { + @Override + public void run() { + LOG.info("StageLoadAsyncExecutor start"); + loadThreadAlive = true; + while (started.get()) { + BatchRecordBuffer buffer = null; + try { + buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS); + if (buffer == null) { + continue; + } + if (buffer.getLabelName() != null) { + String bufferKey = + getTableIdentifier(buffer.getDatabase(), buffer.getTable()); + uploadToStorage(buffer.getLabelName(), buffer); + fileListMap + .computeIfAbsent(bufferKey, k -> new ArrayList<>()) + .add(buffer.getLabelName()); + } + } catch (Exception e) { + LOG.error("worker running error", e); + exception.set(e); + // clear queue to avoid writer thread blocking + flushQueue.clear(); + fileListMap.clear(); + bufferMap.clear(); + break; + } + } + LOG.info("StageLoadAsyncExecutor stop"); + loadThreadAlive = false; + } + + /* + * upload to storage + */ + public void uploadToStorage(String fileName, BatchRecordBuffer buffer) throws IOException { + long start = System.currentTimeMillis(); + LOG.info("file write started for {}", fileName); + String address = getUploadAddress(fileName); + long addressTs = System.currentTimeMillis(); + LOG.info( + "redirect to internalStage address:{}, cost {} ms", address, addressTs - start); + String requestId = uploadToInternalStage(address, buffer.getData()); + LOG.info( + "upload file {} finished, record {} size {}, cost {}ms, with requestId {}", + fileName, + buffer.getNumOfRecords(), + buffer.getBufferSizeBytes(), + System.currentTimeMillis() - addressTs, + requestId); + } + + public String uploadToInternalStage(String address, ByteBuffer data) + throws CopyLoadException { + ByteArrayEntity entity = + new ByteArrayEntity(data.array(), data.arrayOffset(), data.limit()); + HttpPutBuilder putBuilder = new HttpPutBuilder(); + putBuilder.setUrl(address).addCommonHeader().setEntity(entity); + HttpPut httpPut = putBuilder.build(); + try { + Object result = + BackoffAndRetryUtils.backoffAndRetry( + BackoffAndRetryUtils.LoadOperation.UPLOAD_FILE, + () -> { + try (CloseableHttpResponse response = + httpClient.execute(httpPut)) { + final int statusCode = + response.getStatusLine().getStatusCode(); + String requestId = getRequestId(response.getAllHeaders()); + if (statusCode == 200 && response.getEntity() != null) { + String loadResult = + EntityUtils.toString(response.getEntity()); + if (loadResult == null || loadResult.isEmpty()) { + // upload finished + return requestId; + } + LOG.error( + "upload file failed, requestId is {}, response result: {}", + requestId, + loadResult); + throw new CopyLoadException( + "upload file failed: " + + response.getStatusLine().toString() + + ", with requestId " + + requestId); + } + throw new CopyLoadException( + "upload file error: " + + response.getStatusLine().toString() + + ", with requestId " + + requestId); + } + }); + return String.valueOf(result); + } catch (Exception ex) { + LOG.error("Failed to upload data to internal stage ", ex); + throw new CopyLoadException( + "Failed to upload data to internal stage, " + ex.getMessage()); + } + } + + /* + * get requestId from response Header for upload stage + * header key are: x-oss-request-id/x-cos-request-id/x-obs-request-id/x-amz-request-id + * @return key:value + */ + public String getRequestId(Header[] headers) { + if (headers == null || headers.length == 0) { + return null; + } + for (int i = 0; i < headers.length; i++) { + final Header header = headers[i]; + String name = header.getName(); + if (name != null && name.toLowerCase().matches("x-\\S+-request-id")) { + return name + ":" + header.getValue(); + } + } + return null; + } + + /** Get the redirected s3 address. */ + public String getUploadAddress(String fileName) throws CopyLoadException { + HttpPutBuilder putBuilder = new HttpPutBuilder(); + putBuilder + .setUrl(uploadUrl) + .addFileName(fileName) + .addCommonHeader() + .setEmptyEntity() + .baseAuth(username, password); + + try { + Object address = + BackoffAndRetryUtils.backoffAndRetry( + BackoffAndRetryUtils.LoadOperation.GET_INTERNAL_STAGE_ADDRESS, + () -> { + try (CloseableHttpResponse execute = + httpClient.execute(putBuilder.build())) { + int statusCode = execute.getStatusLine().getStatusCode(); + String reason = execute.getStatusLine().getReasonPhrase(); + if (statusCode == 307) { + Header location = execute.getFirstHeader("location"); + String uploadAddress = location.getValue(); + return uploadAddress; + } else { + HttpEntity entity = execute.getEntity(); + String result = + entity == null + ? null + : EntityUtils.toString(entity); + LOG.error( + "Failed to get internalStage address, status {}, reason {}, response {}", + statusCode, + reason, + result); + throw new CopyLoadException( + "Failed get internalStage address"); + } + } + }); + Preconditions.checkNotNull(address, "internalStage address is null"); + return address.toString(); + } catch (Exception e) { + LOG.error("Get internalStage address error,", e); + throw new CopyLoadException("Get internalStage address error, " + e.getMessage()); + } + } + } + + static class DefaultThreadFactory implements ThreadFactory { + private static final AtomicInteger poolNumber = new AtomicInteger(1); + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + DefaultThreadFactory(String name) { + namePrefix = "pool-" + poolNumber.getAndIncrement() + "-" + name + "-"; + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement()); + t.setDaemon(false); + return t; + } + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/CopyCommittableSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/CopyCommittableSerializer.java new file mode 100644 index 000000000..5200e4680 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/CopyCommittableSerializer.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.copy; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +/** define how to serialize DorisCopyCommittable. */ +public class CopyCommittableSerializer implements SimpleVersionedSerializer { + + private static final int VERSION = 1; + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public byte[] serialize(DorisCopyCommittable selectdbCommittable) throws IOException { + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(baos)) { + out.writeUTF(selectdbCommittable.getHostPort()); + + // writeUTF has a length limit, but the copysql is sometimes very long + final byte[] copySqlBytes = + selectdbCommittable.getCopySQL().getBytes(StandardCharsets.UTF_8); + out.writeInt(copySqlBytes.length); + out.write(copySqlBytes); + out.flush(); + return baos.toByteArray(); + } + } + + @Override + public DorisCopyCommittable deserialize(int version, byte[] serialized) throws IOException { + try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized); + final DataInputStream in = new DataInputStream(bais)) { + final String hostPort = in.readUTF(); + + // read copySQL + final int len = in.readInt(); + final byte[] bytes = new byte[len]; + in.read(bytes); + String copySQL = new String(bytes, StandardCharsets.UTF_8); + return new DorisCopyCommittable(hostPort, copySQL); + } + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/CopySQLBuilder.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/CopySQLBuilder.java new file mode 100644 index 000000000..e3642673a --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/CopySQLBuilder.java @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.copy; + +import org.apache.doris.flink.cfg.DorisExecutionOptions; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.StringJoiner; + +import static org.apache.doris.flink.sink.writer.LoadConstants.COLUMNS_KEY; +import static org.apache.doris.flink.sink.writer.LoadConstants.FIELD_DELIMITER_KEY; +import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; +import static org.apache.doris.flink.sink.writer.LoadConstants.JSON; +import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; +import static org.apache.doris.flink.sink.writer.LoadConstants.READ_JSON_BY_LINE; + +public class CopySQLBuilder { + private static final String COPY_SYNC = "copy.async"; + private static final String COPY_DELETE = "copy.use_delete_sign"; + private static final String STRIP_OUT_ARRAY = "strip_outer_array"; + private final DorisExecutionOptions executionOptions; + private final String tableIdentifier; + private final List fileList; + private Properties properties; + + public CopySQLBuilder( + String tableIdentifier, DorisExecutionOptions executionOptions, List fileList) { + this.tableIdentifier = tableIdentifier; + this.executionOptions = executionOptions; + this.fileList = fileList; + this.properties = executionOptions.getStreamLoadProp(); + } + + public String buildCopySQL() { + StringBuilder sb = new StringBuilder(); + sb.append("COPY INTO ") + .append(tableIdentifier) + .append(" FROM @~('{") + .append(String.join(",", fileList)) + .append("}') ") + .append("PROPERTIES ("); + + // copy into must be sync + properties.put(COPY_SYNC, false); + if (executionOptions.getDeletable()) { + properties.put(COPY_DELETE, true); + } + + if (JSON.equals(properties.getProperty(FORMAT_KEY))) { + properties.put(STRIP_OUT_ARRAY, false); + } + + properties.remove(READ_JSON_BY_LINE); + properties.remove(COLUMNS_KEY); + StringJoiner props = new StringJoiner(","); + for (Map.Entry entry : properties.entrySet()) { + String key = concatPropPrefix(String.valueOf(entry.getKey())); + String value = String.valueOf(entry.getValue()); + String prop = String.format("'%s'='%s'", key, value); + props.add(prop); + } + sb.append(props).append(")"); + return sb.toString(); + } + + static List PREFIX_LIST = + Arrays.asList(FIELD_DELIMITER_KEY, LINE_DELIMITER_KEY, STRIP_OUT_ARRAY); + + private String concatPropPrefix(String key) { + if (PREFIX_LIST.contains(key)) { + return "file." + key; + } + if (FORMAT_KEY.equals(key)) { + return "file.type"; + } + return key; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommittable.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommittable.java new file mode 100644 index 000000000..861fa7841 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommittable.java @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.copy; + +import org.apache.doris.flink.sink.DorisAbstractCommittable; + +import java.util.Objects; + +public class DorisCopyCommittable implements DorisAbstractCommittable { + private final String hostPort; + private final String copySQL; + + public DorisCopyCommittable(String hostPort, String copySQL) { + this.hostPort = hostPort; + this.copySQL = copySQL; + } + + public String getHostPort() { + return hostPort; + } + + public String getCopySQL() { + return copySQL; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DorisCopyCommittable that = (DorisCopyCommittable) o; + return Objects.equals(hostPort, that.hostPort) && Objects.equals(copySQL, that.copySQL); + } + + @Override + public int hashCode() { + return Objects.hash(hostPort, copySQL); + } + + @Override + public String toString() { + return "SelectdbCommittable{" + + "hostPort='" + + hostPort + + '\'' + + ", copySQL='" + + copySQL + + '\'' + + '}'; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java new file mode 100644 index 000000000..b1b00c7c4 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.copy; + +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.util.CollectionUtil; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.exception.CopyLoadException; +import org.apache.doris.flink.sink.HttpUtil; +import org.apache.doris.flink.sink.ResponseUtil; +import org.apache.doris.flink.sink.copy.models.BaseResponse; +import org.apache.doris.flink.sink.copy.models.CopyIntoResp; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class DorisCopyCommitter implements Committer, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(DorisCopyCommitter.class); + private static final String commitPattern = "http://%s/copy/query"; + private static final int SUCCESS = 0; + private static final String FAIL = "1"; + private ObjectMapper objectMapper = new ObjectMapper(); + private final CloseableHttpClient httpClient; + private final DorisOptions dorisOptions; + int maxRetry; + + public DorisCopyCommitter(DorisOptions dorisOptions, int maxRetry) { + this(dorisOptions, maxRetry, new HttpUtil().getHttpClientWithTimeout()); + } + + public DorisCopyCommitter(DorisOptions dorisOptions, int maxRetry, CloseableHttpClient client) { + this.dorisOptions = dorisOptions; + this.maxRetry = maxRetry; + this.httpClient = client; + } + + @Override + public void commit(Collection> committableList) + throws IOException, InterruptedException { + for (CommitRequest committable : committableList) { + commitTransaction(committable.getCommittable()); + } + } + + private void commitTransaction(DorisCopyCommittable committable) throws IOException { + String hostPort = committable.getHostPort(); + String copySQL = committable.getCopySQL(); + + int statusCode = -1; + String reasonPhrase = null; + int retry = 0; + Map params = new HashMap<>(); + params.put("sql", copySQL); + boolean success = false; + String loadResult = ""; + long start = System.currentTimeMillis(); + while (retry++ <= maxRetry) { + LOG.info("commit with copy sql: {}", copySQL); + HttpPostBuilder postBuilder = new HttpPostBuilder(); + postBuilder + .setUrl(String.format(commitPattern, hostPort)) + .baseAuth(dorisOptions.getUsername(), dorisOptions.getPassword()) + .setEntity(new StringEntity(objectMapper.writeValueAsString(params))); + + try (CloseableHttpResponse response = httpClient.execute(postBuilder.build())) { + statusCode = response.getStatusLine().getStatusCode(); + reasonPhrase = response.getStatusLine().getReasonPhrase(); + if (statusCode != 200) { + LOG.warn( + "commit failed with status {} {}, reason {}", + statusCode, + hostPort, + reasonPhrase); + } else if (response.getEntity() != null) { + loadResult = EntityUtils.toString(response.getEntity()); + success = handleCommitResponse(loadResult); + if (success) { + LOG.info( + "commit success cost {}ms, response is {}", + System.currentTimeMillis() - start, + loadResult); + break; + } else { + LOG.warn("commit failed, retry again"); + } + } + } catch (IOException e) { + LOG.error("commit error : ", e); + } + } + + if (!success) { + LOG.error( + "commit error with status {}, reason {}, response {}", + statusCode, + reasonPhrase, + loadResult); + String copyErrMsg = + String.format( + "commit error, status: %d, reason: %s, response: %s, copySQL: %s", + statusCode, reasonPhrase, loadResult, committable.getCopySQL()); + throw new CopyLoadException(copyErrMsg); + } + } + + public boolean handleCommitResponse(String loadResult) throws IOException { + BaseResponse baseResponse = + objectMapper.readValue(loadResult, new TypeReference() {}); + if (baseResponse.getCode() == SUCCESS && baseResponse.getData() instanceof Map) { + CopyIntoResp dataResp = + objectMapper.convertValue(baseResponse.getData(), CopyIntoResp.class); + // Returning code means there is an exception, and returning result means success + if (FAIL.equals(dataResp.getDataCode())) { + LOG.error("copy into execute failed, reason:{}", loadResult); + return false; + } else { + Map result = dataResp.getResult(); + if (CollectionUtil.isNullOrEmpty(result) + || (!result.get("state").equals("FINISHED") + && !ResponseUtil.isCommitted(result.get("msg")))) { + LOG.error("copy into load failed, reason:{}", loadResult); + return false; + } else { + return true; + } + } + } else { + LOG.error("commit failed, reason:{}", loadResult); + return false; + } + } + + @Override + public void close() throws IOException { + if (httpClient != null) { + httpClient.close(); + } + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java new file mode 100644 index 000000000..136b804e9 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyWriter.java @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.copy; + +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.sink.writer.DorisAbstractWriter; +import org.apache.doris.flink.sink.writer.DorisWriterState; +import org.apache.doris.flink.sink.writer.LabelGenerator; +import org.apache.doris.flink.sink.writer.serializer.DorisRecord; +import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class DorisCopyWriter + implements DorisAbstractWriter { + + private static final Logger LOG = LoggerFactory.getLogger(DorisCopyWriter.class); + private BatchStageLoad batchStageLoad; + private final DorisOptions dorisOptions; + private final DorisReadOptions dorisReadOptions; + private final DorisExecutionOptions executionOptions; + private final String labelPrefix; + private final LabelGenerator labelGenerator; + private final long flushIntervalMs; + private final DorisRecordSerializer serializer; + private final transient ScheduledExecutorService scheduledExecutorService; + private transient volatile Exception flushException = null; + private String database; + private String table; + + public DorisCopyWriter( + Sink.InitContext initContext, + DorisRecordSerializer serializer, + DorisOptions dorisOptions, + DorisReadOptions dorisReadOptions, + DorisExecutionOptions executionOptions) { + if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) { + String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); + Preconditions.checkState( + tableInfo.length == 2, + "tableIdentifier input error, the format is database.table"); + this.database = tableInfo[0]; + this.table = tableInfo[1]; + } + LOG.info("labelPrefix " + executionOptions.getLabelPrefix()); + this.labelPrefix = + executionOptions.getLabelPrefix() + + "_" + + UUID.randomUUID().toString().replaceAll("-", ""); + this.labelGenerator = new LabelGenerator(labelPrefix, false, initContext.getSubtaskId()); + this.scheduledExecutorService = + new ScheduledThreadPoolExecutor( + 1, new ExecutorThreadFactory("copy-upload-interval")); + this.serializer = serializer; + this.dorisOptions = dorisOptions; + this.dorisReadOptions = dorisReadOptions; + this.executionOptions = executionOptions; + this.flushIntervalMs = executionOptions.getBufferFlushIntervalMs(); + serializer.initial(); + initializeLoad(); + } + + public void initializeLoad() { + this.batchStageLoad = + new BatchStageLoad( + dorisOptions, dorisReadOptions, executionOptions, labelGenerator); + // when uploading data in streaming mode, we need to regularly detect whether there are + // exceptions. + scheduledExecutorService.scheduleWithFixedDelay( + this::intervalFlush, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS); + } + + private void intervalFlush() { + try { + LOG.info("interval flush triggered."); + batchStageLoad.flush(null, false); + } catch (InterruptedException e) { + flushException = e; + } + } + + @Override + public void write(IN in, Context context) throws IOException, InterruptedException { + checkFlushException(); + writeOneDorisRecord(serializer.serialize(in)); + } + + public void writeOneDorisRecord(DorisRecord record) throws InterruptedException { + if (record == null || record.getRow() == null) { + // ddl or value is null + return; + } + String db = this.database; + String tbl = this.table; + // multi table load + if (record.getTableIdentifier() != null) { + db = record.getDatabase(); + tbl = record.getTable(); + } + batchStageLoad.writeRecord(db, tbl, record.getRow()); + } + + @Override + public void flush(boolean b) throws IOException, InterruptedException { + checkFlushException(); + writeOneDorisRecord(serializer.flush()); + LOG.info("checkpoint flush triggered."); + batchStageLoad.flush(null, true); + } + + @Override + public Collection prepareCommit() + throws IOException, InterruptedException { + Preconditions.checkState(batchStageLoad != null); + LOG.info("checkpoint arrived, upload buffer to storage"); + List committables = new ArrayList<>(); + Map> fileListMap = this.batchStageLoad.getFileListMap(); + for (Map.Entry> entry : fileListMap.entrySet()) { + String tableIdentifier = entry.getKey(); + List fileList = entry.getValue(); + CopySQLBuilder copySQLBuilder = + new CopySQLBuilder(tableIdentifier, executionOptions, fileList); + String copySql = copySQLBuilder.buildCopySQL(); + committables.add(new DorisCopyCommittable(dorisOptions.getFenodes(), copySql)); + } + return committables; + } + + @Override + public List snapshotState(long checkpointId) throws IOException { + Preconditions.checkState(batchStageLoad != null); + if (!batchStageLoad.getFileListMap().isEmpty()) { + LOG.info("clear the file list {}", batchStageLoad.getFileListMap()); + this.batchStageLoad.clearFileListMap(); + } + + this.batchStageLoad.setCurrentCheckpointID(checkpointId + 1); + // Files will be automatically cleaned + return Collections.emptyList(); + } + + @Override + public void close() throws Exception { + LOG.info("DorisBatchWriter Close"); + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdownNow(); + } + batchStageLoad.close(); + } + + private void checkFlushException() { + if (flushException != null) { + throw new RuntimeException("Writing records to streamload failed.", flushException); + } + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/HttpPostBuilder.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/HttpPostBuilder.java new file mode 100644 index 000000000..657e2611e --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/HttpPostBuilder.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.copy; + +import org.apache.flink.util.Preconditions; + +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.HttpPost; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +/** Builder for HttpPost. */ +public class HttpPostBuilder { + String url; + Map header; + HttpEntity httpEntity; + + public HttpPostBuilder() { + header = new HashMap<>(); + } + + public HttpPostBuilder setUrl(String url) { + this.url = url; + return this; + } + + public HttpPostBuilder addCommonHeader() { + header.put(HttpHeaders.EXPECT, "100-continue"); + return this; + } + + public HttpPostBuilder baseAuth(String user, String password) { + final String authInfo = user + ":" + password; + byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8)); + header.put(HttpHeaders.AUTHORIZATION, "Basic " + new String(encoded)); + return this; + } + + public HttpPostBuilder setEntity(HttpEntity httpEntity) { + this.httpEntity = httpEntity; + return this; + } + + public HttpPost build() { + Preconditions.checkNotNull(url); + Preconditions.checkNotNull(httpEntity); + HttpPost put = new HttpPost(url); + header.forEach(put::setHeader); + put.setEntity(httpEntity); + return put; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/models/BaseResponse.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/models/BaseResponse.java new file mode 100644 index 000000000..4cd5cfa56 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/models/BaseResponse.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.copy.models; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class BaseResponse { + private int code; + private String msg; + private T data; + private int count; + + public int getCode() { + return code; + } + + public String getMsg() { + return msg; + } + + public T getData() { + return data; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/models/CopyIntoResp.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/models/CopyIntoResp.java new file mode 100644 index 000000000..e69321f35 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/models/CopyIntoResp.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.copy.models; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import java.util.Map; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class CopyIntoResp extends BaseResponse { + private String code; + private String exception; + + private Map result; + + public String getDataCode() { + return code; + } + + public String getException() { + return exception; + } + + public Map getResult() { + return result; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java index 312dd8fda..91eaf9e2e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java @@ -41,6 +41,12 @@ public LabelGenerator( this.subtaskId = subtaskId; } + public LabelGenerator(String labelPrefix, boolean enable2PC, int subtaskId) { + this.labelPrefix = labelPrefix; + this.enable2PC = enable2PC; + this.subtaskId = subtaskId; + } + public String generateLabel(long chkId) { String label = String.format("%s_%s_%s", labelPrefix, subtaskId, chkId); return enable2PC ? label : label + "_" + UUID.randomUUID(); @@ -59,4 +65,8 @@ public String generateBatchLabel() { public String generateBatchLabel(String table) { return String.format("%s_%s_%s", labelPrefix, table, UUID.randomUUID()); } + + public String generateCopyBatchLabel(String table, long chkId, int fileNum) { + return String.format("%s_%s_%s_%s_%s", labelPrefix, table, subtaskId, chkId, fileNum); + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index e71ed51dd..df822d408 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -34,6 +34,7 @@ import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.sink.DorisSink; +import org.apache.doris.flink.sink.writer.WriteMode; import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer; import org.apache.doris.flink.table.DorisConfigOptions; import org.slf4j.Logger; @@ -276,6 +277,9 @@ public DorisSink buildDorisSink(String table) { sinkConfig .getOptional(DorisConfigOptions.SINK_USE_CACHE) .ifPresent(executionBuilder::setUseCache); + sinkConfig + .getOptional(DorisConfigOptions.SINK_WRITE_MODE) + .ifPresent(v -> executionBuilder.setWriteMode(WriteMode.of(v))); DorisExecutionOptions executionOptions = executionBuilder.build(); builder.setDorisReadOptions(DorisReadOptions.builder().build())