Skip to content

Commit

Permalink
[Improve]Added direct access to BE through the intranet (apache#187)
Browse files Browse the repository at this point in the history
  • Loading branch information
DongLiang-0 authored Sep 5, 2023
1 parent 9831982 commit 8c15c4f
Show file tree
Hide file tree
Showing 18 changed files with 236 additions and 25 deletions.
8 changes: 7 additions & 1 deletion flink-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,13 @@ under the License.
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.27.0</version>
<version>4.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,23 @@ public class DorisConnectionOptions implements Serializable {
protected final String username;
protected final String password;
protected String jdbcUrl;
protected String benodes;

public DorisConnectionOptions(String fenodes, String username, String password) {
this.fenodes = Preconditions.checkNotNull(fenodes, "fenodes is empty");
this.username = username;
this.password = password;
}

public DorisConnectionOptions(String fenodes, String username, String password, String jdbcUrl){
this(fenodes,username,password);
public DorisConnectionOptions(String fenodes, String username, String password, String jdbcUrl) {
this(fenodes, username, password);
this.jdbcUrl = jdbcUrl;
}

public DorisConnectionOptions(String fenodes, String benodes, String username, String password,
String jdbcUrl) {
this(fenodes, username, password);
this.benodes = benodes;
this.jdbcUrl = jdbcUrl;
}

Expand All @@ -55,6 +63,10 @@ public String getPassword() {
return password;
}

public String getBenodes() {
return benodes;
}

public String getJdbcUrl(){
return jdbcUrl;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ public DorisOptions(String fenodes, String username, String password, String tab
this.tableIdentifier = tableIdentifier;
}

public DorisOptions(String fenodes, String beNodes, String username, String password,
String tableIdentifier, String jdbcUrl) {
super(fenodes, beNodes, username, password, jdbcUrl);
this.tableIdentifier = tableIdentifier;
}

public String getTableIdentifier() {
return tableIdentifier;
}
Expand All @@ -60,7 +66,7 @@ public static Builder builder() {
*/
public static class Builder {
private String fenodes;

private String benodes;
private String jdbcUrl;
private String username;
private String password;
Expand Down Expand Up @@ -98,6 +104,14 @@ public Builder setFenodes(String fenodes) {
return this;
}

/**
* optional, Backend Http Port
*/
public Builder setBenodes(String benodes) {
this.benodes = benodes;
return this;
}

/**
* not required, fe jdbc url, for lookup query
*/
Expand All @@ -109,9 +123,8 @@ public Builder setJdbcUrl(String jdbcUrl) {
public DorisOptions build() {
checkNotNull(fenodes, "No fenodes supplied.");
checkNotNull(tableIdentifier, "No tableIdentifier supplied.");
return new DorisOptions(fenodes, username, password, tableIdentifier, jdbcUrl);
return new DorisOptions(fenodes, benodes, username, password, tableIdentifier, jdbcUrl);
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@

import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.rest.models.BackendV2;
import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class BackendUtil {
Expand All @@ -36,13 +40,34 @@ public BackendUtil(List<BackendV2.BackendRowV2> backends) {
this.pos = 0;
}

public BackendUtil(String beNodes) {
this.backends = initBackends(beNodes);
this.pos = 0;
}

private List<BackendV2.BackendRowV2> initBackends(String beNodes) {
List<BackendV2.BackendRowV2> backends = new ArrayList<>();
List<String> nodes = Arrays.asList(beNodes.split(","));
nodes.forEach(node -> {
if (tryHttpConnection(node)) {
node = node.trim();
String[] ipAndPort = node.split(":");
BackendRowV2 backendRowV2 = new BackendRowV2();
backendRowV2.setIp(ipAndPort[0]);
backendRowV2.setHttpPort(Integer.parseInt(ipAndPort[1]));
backendRowV2.setAlive(true);
backends.add(backendRowV2);
}
});
return backends;
}

public String getAvailableBackend() {
long tmp = pos + backends.size();
while (pos < tmp) {
BackendV2.BackendRowV2 backend = backends.get((int) (pos % backends.size()));
BackendV2.BackendRowV2 backend = backends.get((int) (pos++ % backends.size()));
String res = backend.toBackendString();
if(tryHttpConnection(res)){
pos++;
if (tryHttpConnection(res)) {
return res;
}
}
Expand All @@ -60,7 +85,6 @@ public boolean tryHttpConnection(String backend) {
return true;
} catch (Exception ex) {
LOG.warn("Failed to connect to backend:{}", backend, ex);
pos++;
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.writer.LabelGenerator;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
Expand Down Expand Up @@ -93,7 +95,9 @@ public DorisBatchStreamLoad(DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
DorisExecutionOptions executionOptions,
LabelGenerator labelGenerator) {
this.backendUtil = new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil(
dorisOptions.getBenodes())
: new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
this.hostPort = backendUtil.getAvailableBackend();
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
this.db = tableInfo[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.flink.sink.committer;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.connector.sink.Committer;

import com.fasterxml.jackson.core.type.TypeReference;
Expand All @@ -25,6 +26,7 @@
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
Expand Down Expand Up @@ -55,6 +57,7 @@ public class DorisCommitter implements Committer<DorisCommittable> {
private final DorisOptions dorisOptions;
private final DorisReadOptions dorisReadOptions;
private final ObjectMapper jsonMapper = new ObjectMapper();
private final BackendUtil backendUtil;

int maxRetry;

Expand All @@ -67,6 +70,9 @@ public DorisCommitter(DorisOptions dorisOptions, DorisReadOptions dorisReadOptio
this.dorisReadOptions = dorisReadOptions;
this.maxRetry = maxRetry;
this.httpClient = client;
this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil(
dorisOptions.getBenodes())
: new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
}

@Override
Expand Down Expand Up @@ -116,13 +122,13 @@ private void commitTransaction(DorisCommittable committable) throws IOException
if (retry == maxRetry) {
throw new DorisRuntimeException("stream load error: " + reasonPhrase);
}
hostPort = RestService.getBackend(dorisOptions, dorisReadOptions, LOG);
hostPort = backendUtil.getAvailableBackend();
} catch (IOException e) {
LOG.error("commit transaction failed: ", e);
if (retry == maxRetry) {
throw new IOException("commit transaction failed: {}", e);
}
hostPort = RestService.getBackend(dorisOptions, dorisReadOptions, LOG);
hostPort = backendUtil.getAvailableBackend();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpUtil;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
Expand Down Expand Up @@ -99,8 +101,9 @@ public DorisWriter(Sink.InitContext initContext,
}

public void initializeLoad(List<DorisWriterState> state) throws IOException {
//cache backend
backendUtil = new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil(
dorisOptions.getBenodes())
: new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
try {
this.dorisStreamLoad = new DorisStreamLoad(
backendUtil.getAvailableBackend(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class DorisConfigOptions {
public static final String IDENTIFIER = "doris";
// common option
public static final ConfigOption<String> FENODES = ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris fe http address.");
public static final ConfigOption<String> BENODES = ConfigOptions.key("benodes").stringType().noDefaultValue().withDescription("doris be http address.");
public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the doris table name.");
public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the doris user name.");
public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the doris password.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.doris.flink.cfg.DorisLookupOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;

import static org.apache.doris.flink.table.DorisConfigOptions.BENODES;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
Expand Down Expand Up @@ -103,6 +105,7 @@ public Set<ConfigOption<?>> requiredOptions() {
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(FENODES);
options.add(BENODES);
options.add(TABLE_IDENTIFIER);
options.add(USERNAME);
options.add(PASSWORD);
Expand Down Expand Up @@ -169,8 +172,10 @@ public DynamicTableSource createDynamicTableSource(Context context) {

private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
final String fenodes = readableConfig.get(FENODES);
final String benodes = readableConfig.get(BENODES);
final DorisOptions.Builder builder = DorisOptions.builder()
.setFenodes(fenodes)
.setBenodes(benodes)
.setJdbcUrl(readableConfig.get(JDBC_URL))
.setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
}
DorisRowDataInputFormat.Builder builder = DorisRowDataInputFormat.builder()
.setFenodes(options.getFenodes())
.setBenodes(options.getBenodes())
.setUsername(options.getUsername())
.setPassword(options.getPassword())
.setTableIdentifier(options.getTableIdentifier())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,11 @@ public Builder setFenodes(String fenodes) {
return this;
}

public Builder setBenodes(String benodes) {
this.optionsBuilder.setBenodes(benodes);
return this;
}

public Builder setUsername(String username) {
this.optionsBuilder.setUsername(username);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,15 @@ private DorisConnectionOptions getDorisConnectionOptions() {
*/
public DorisSink<String> buildDorisSink(String table) {
String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
String labelPrefix = sinkConfig.getString(DorisConfigOptions.SINK_LABEL_PREFIX);

DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes(fenodes)
.setBenodes(benodes)
.setTableIdentifier(database + "." + table)
.setUsername(user)
.setPassword(passwd);
Expand Down
Loading

0 comments on commit 8c15c4f

Please sign in to comment.