-
Notifications
You must be signed in to change notification settings - Fork 428
[BAHIR-234] Add ClickHouse Connector for Flink #85
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pyscala for the contribution :)
I have added a few comments.
</dependency> | ||
</dependencies> | ||
|
||
<build> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this are in parent pom no?
...main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseAppendSinkFunction.java
Outdated
Show resolved
Hide resolved
...main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseAppendSinkFunction.java
Outdated
Show resolved
Hide resolved
/** | ||
* Created by liufangliang on 2020/4/16. | ||
*/ | ||
public class ClickHouseTableSinkTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all methods are empty
flink-connector-clickhouse/src/test/java/com/apache/flink/table/descriptors/ClickHouseTest.java
Outdated
Show resolved
Hide resolved
...tor-clickhouse/src/test/java/com/apache/flink/table/descriptors/ClickHouseValidatorTest.java
Outdated
Show resolved
Hide resolved
@eskabetxe Is there remained concern on the PR? I'd love to see the PR merged so that I can use it in work :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should revise your tests, they appear to be executed with a local clickhouse instance
flink-connector-clickhouse/pom.xml
Outdated
<dependency> | ||
<groupId>ru.yandex.clickhouse</groupId> | ||
<artifactId>clickhouse-jdbc</artifactId> | ||
<version>0.1.50</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be in properties so user can change this version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for your reply,i will fix it later.
|
||
public class ClickHouseAppendSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction { | ||
private static final String USERNAME = "user"; | ||
private static final String PASSWORD = "password"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
identation
try { | ||
Thread.sleep(retryInterval); | ||
} catch (InterruptedException e) { | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if needed log the exception
try { | ||
copy = new ClickHouseTableSink(address, username, password, database, table, schema, batchSize, commitPadding, retries, retryInterval, ignoreInsertError); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a error message
String[] fieldNames = tableSchema.getFieldNames(); | ||
String columns = String.join(",", fieldNames); | ||
String[] questionMark = new String[fieldNames.length]; | ||
for (int i = 0; i < questionMark.length; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could be change to a stream map no?
String questionMark = Arrays.stream(fieldNames)
.map(field -> "?")
.reduce((left,right) -> left+","+right)
.get();
private PreparedStatement pstat; | ||
|
||
@Test | ||
public void open() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
empty test
for (int i = 0; i < value.getArity(); i++) { | ||
pstat.setObject(i + 1, value.getField(i)); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the test?
.retryInterval(3000L); | ||
Map<String, String> connectorProperties = clickhouse.toConnectorProperties(); | ||
for (Map.Entry<String, String> entry : connectorProperties.entrySet()) { | ||
System.out.println(entry.getKey() + ":" + entry.getValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should test that the map is what you expect and not print it to console
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eskabetxe thanks for your reply, related optimization has been completed, review again.
|
||
@Test | ||
public void createStreamTableSink() throws Exception { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are testing with local clickhouse?
we should have a testcontainer that create a clickhouse instance to test
Implement Streaming ClickHouseSink,support Flink Table API & Flink SQL for ClickHouse connector
@eskabetxe Optimization complete, looking forward to your reply. |
How is it going? How to contribute code? |
Is this PR still alive? |
Implement Streaming ClickHouseSink,support Flink Table API & Flink SQL
for ClickHouse connector