Skip to content

Commit

Permalink
[core] Remove log system options (#2305)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Nov 13, 2023
1 parent 536a616 commit c9cf20a
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 54 deletions.
24 changes: 0 additions & 24 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -213,30 +213,6 @@
<td>Integer</td>
<td>The maximal fan-in for external merge sort. It limits the number of file handles. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading.</td>
</tr>
<tr>
<td><h5>log.changelog-mode</h5></td>
<td style="word-wrap: break-word;">auto</td>
<td><p>Enum</p></td>
<td>Specify the log changelog mode for table.<br /><br />Possible values:<ul><li>"auto": Upsert for table with primary key, all for table without primary key.</li><li>"all": The log system stores all changes including UPDATE_BEFORE.</li><li>"upsert": The log system does not store the UPDATE_BEFORE changes, the log consumed job will automatically add the normalized node, relying on the state to generate the required update_before.</li></ul></td>
</tr>
<tr>
<td><h5>log.consistency</h5></td>
<td style="word-wrap: break-word;">transactional</td>
<td><p>Enum</p></td>
<td>Specify the log consistency mode for table.<br /><br />Possible values:<ul><li>"transactional": Only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval.</li><li>"eventual": Immediate data visibility, you may see some intermediate states, but eventually the right results will be produced, only works for table with primary key.</li></ul></td>
</tr>
<tr>
<td><h5>log.format</h5></td>
<td style="word-wrap: break-word;">"debezium-json"</td>
<td>String</td>
<td>Specify the message format of log system.</td>
</tr>
<tr>
<td><h5>log.key.format</h5></td>
<td style="word-wrap: break-word;">"json"</td>
<td>String</td>
<td>Specify the key message format of log system with primary key.</td>
</tr>
<tr>
<td><h5>lookup.cache-file-retention</h5></td>
<td style="word-wrap: break-word;">1 h</td>
Expand Down
12 changes: 0 additions & 12 deletions docs/layouts/shortcodes/generated/flink_catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,5 @@
<td>Boolean</td>
<td>If true, creating table in default database is not allowed. Default is false.</td>
</tr>
<tr>
<td><h5>log.system.auto-register</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, the register will automatically create and delete a topic in log system for Paimon table. Default kafka log store register is supported, users can implement customized register for log system, for example, create a new class which extends KafkaLogStoreFactory and return a customized LogStoreRegister for their kafka cluster to create/delete topics.</td>
</tr>
<tr>
<td><h5>log.system.auto-register-timeout</h5></td>
<td style="word-wrap: break-word;">1 min</td>
<td>Duration</td>
<td>The timeout for register to create or delete topic in log system.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,6 @@
<td>Boolean</td>
<td>When changelog-producer is set to LOOKUP, commit will wait for changelog generation by lookup.</td>
</tr>
<tr>
<td><h5>log.system</h5></td>
<td style="word-wrap: break-word;">"none"</td>
<td>String</td>
<td>The log system used to keep changes of the table.<br /><br />Possible values:<br /><ul><li>"none": No log system, the data is written only to file store, and the streaming read will be directly read from the file store.</li></ul><ul><li>"kafka": Kafka log system, the data is double written to file store and kafka, and the streaming read will be read from kafka. If streaming read from file, configures streaming-read-mode to file.</li></ul></td>
</tr>
<tr>
<td><h5>log.system.partitions</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>Integer</td>
<td>The number of partitions of the log system. If log system is kafka, this is kafka partitions.</td>
</tr>
<tr>
<td><h5>log.system.replication</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>Integer</td>
<td>The number of replication of the log system. If log system is kafka, this is kafka replicationFactor.</td>
</tr>
<tr>
<td><h5>lookup.async</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,25 +477,29 @@ public class CoreOptions implements Serializable {
+ "Note: Scale-up this parameter will increase memory usage while scanning manifest files. "
+ "We can consider downsize it when we encounter an out of memory exception while scanning");

@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption<LogConsistency> LOG_CONSISTENCY =
key("log.consistency")
.enumType(LogConsistency.class)
.defaultValue(LogConsistency.TRANSACTIONAL)
.withDescription("Specify the log consistency mode for table.");

@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption<LogChangelogMode> LOG_CHANGELOG_MODE =
key("log.changelog-mode")
.enumType(LogChangelogMode.class)
.defaultValue(LogChangelogMode.AUTO)
.withDescription("Specify the log changelog mode for table.");

@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption<String> LOG_KEY_FORMAT =
key("log.key.format")
.stringType()
.defaultValue("json")
.withDescription(
"Specify the key message format of log system with primary key.");

@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption<String> LOG_FORMAT =
key("log.format")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink;

import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
Expand All @@ -32,6 +33,7 @@ public class FlinkCatalogOptions {
.stringType()
.defaultValue(Catalog.DEFAULT_DATABASE);

@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption<Boolean> LOG_SYSTEM_AUTO_REGISTER =
ConfigOptions.key("log.system.auto-register")
.booleanType()
Expand All @@ -41,12 +43,14 @@ public class FlinkCatalogOptions {
+ "is supported, users can implement customized register for log system, for example, create a new class which extends "
+ "KafkaLogStoreFactory and return a customized LogStoreRegister for their kafka cluster to create/delete topics.");

@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption<Duration> REGISTER_TIMEOUT =
ConfigOptions.key("log.system.auto-register-timeout")
.durationType()
.defaultValue(Duration.ofMinutes(1))
.withDescription(
"The timeout for register to create or delete topic in log system.");

public static final ConfigOption<Boolean> DISABLE_CREATE_TABLE_IN_DEFAULT_DB =
ConfigOptions.key("disable-create-table-in-default-db")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class FlinkConnectorOptions {

public static final String TABLE_DYNAMIC_OPTION_PREFIX = "paimon";

@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption<String> LOG_SYSTEM =
ConfigOptions.key("log.system")
.stringType()
Expand All @@ -70,13 +71,15 @@ public class FlinkConnectorOptions {
+ "."))
.build());

@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption<Integer> LOG_SYSTEM_PARTITIONS =
ConfigOptions.key("log.system.partitions")
.intType()
.defaultValue(1)
.withDescription(
"The number of partitions of the log system. If log system is kafka, this is kafka partitions.");

@ExcludeFromDocumentation("Confused without log system")
public static final ConfigOption<Integer> LOG_SYSTEM_REPLICATION =
ConfigOptions.key("log.system.replication")
.intType()
Expand Down

0 comments on commit c9cf20a

Please sign in to comment.