Skip to content

Commit

Permalink
Merge pull request #62 from rayokota/upgrade-kcache-3.3.1
Browse files Browse the repository at this point in the history
Upgrade to KCache 3.3.1
  • Loading branch information
rayokota authored Sep 12, 2020
2 parents b6f202b + 8e0fd6d commit 587199f
Show file tree
Hide file tree
Showing 13 changed files with 39 additions and 61 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ properties.put("schemaFactory", "io.kareldb.schema.SchemaFactory");
properties.put("parserFactory", "org.apache.calcite.sql.parser.parserextension.ExtensionSqlParserImpl#FACTORY");
properties.put("schema.kind", "io.kareldb.kafka.KafkaSchema");
properties.put("schema.kafkacache.bootstrap.servers", bootstrapServers);
properties.put("schema.rocksdb.root.dir", "/tmp");
properties.put("schema.kafkacache.data.dir", "/tmp");

try (Connection conn = DriverManager.getConnection("jdbc:kareldb:", properties);
Statement s = conn.createStatement()) {
Expand Down Expand Up @@ -125,8 +125,9 @@ KarelDB has a number of configuration properties that can be specified. When us
- `listeners` - List of listener URLs that include the scheme, host, and port. Defaults to `http://0.0.0.0:8765`.
- `cluster.group.id` - The group ID to be used for leader election. Defaults to `kareldb`.
- `leader.eligibility` - Whether this node can participate in leader election. Defaults to true.
- `rocksdb.enable` - Whether to use RocksDB in KCache. Defaults to true. Otherwise an in-memory cache is used.
- `rocksdb.root.dir` - The root directory for RocksDB storage. Defaults to `/tmp`.
- `kafkacache.backing.cache` - The backing cache for KCache, one of `memory` (default
), `bdbje`, `lmdb`, or `rocksdb`.
- `kafkacache.data.dir` - The root directory for backing cache storage. Defaults to `/tmp`.
- `kafkacache.bootstrap.servers` - A list of host and port pairs to use for establishing the initial connection to Kafka.
- `kafkacache.group.id` - The group ID to use for the internal consumers, which needs to be unique for each node. Defaults to `kareldb-1`.
- `kafkacache.topic.replication.factor` - The replication factor for the internal topics created by KarelDB. Defaults to 3.
Expand Down
8 changes: 4 additions & 4 deletions config/kareldb-inmemory.properties
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ kafkacache.bootstrap.servers=localhost:9092
# The group ID to be used for internal consumers, which needs to be unique for each node.
kafkacache.group.id=kareldb-1

# Whether to enable RocksDB in KCache
rocksdb.enable=false
# The backing cache
kafkacache.backing.cache=memory

# The root directory for RocksDB storage
rocksdb.root.dir=/tmp
# The root directory for backing cache storage
kafkacache.data.dir=/tmp

# If true, API requests that fail will include extra debugging information, including stack traces
debug=false
8 changes: 4 additions & 4 deletions config/kareldb-replica.properties
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ kafkacache.bootstrap.servers=localhost:9092
# The group ID to be used for internal consumers, which needs to be unique for each node.
kafkacache.group.id=kareldb-2

# Whether to enable RocksDB in KCache
rocksdb.enable=true
# The backing cache
kafkacache.backing.cache=rocksdb

# The root directory for RocksDB storage
rocksdb.root.dir=/tmp/replica
# The root directory for backing cache storage
kafkacache.data.dir=/tmp/replica

# If true, API requests that fail will include extra debugging information, including stack traces
debug=false
8 changes: 4 additions & 4 deletions config/kareldb-secure-replica.properties
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ kafkacache.bootstrap.servers=localhost:9092
# The group ID to be used for internal consumers, which needs to be unique for each node.
kafkacache.group.id=kareldb-2

# Whether to enable RocksDB in KCache
rocksdb.enable=true
# The backing cache
kafkacache.backing.cache=rocksdb

# The root directory for RocksDB storage
rocksdb.root.dir=/tmp/replica
# The root directory for backing cache storage
kafkacache.data.dir=/tmp/replica

ssl.keystore.location=custom.keystore
ssl.keystore.password=changeme
Expand Down
8 changes: 4 additions & 4 deletions config/kareldb-secure.properties
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ kafkacache.bootstrap.servers=localhost:9092
# The group ID to be used for internal consumers, which needs to be unique for each node.
kafkacache.group.id=kareldb-1

# Whether to enable RocksDB in KCache
rocksdb.enable=true
# The backing cache
kafkacache.backing.cache=rocksdb

# The root directory for RocksDB storage
rocksdb.root.dir=/tmp
# The root directory for backing cache storage
kafkacache.data.dir=/tmp

ssl.keystore.location=custom.keystore
ssl.keystore.password=changeme
Expand Down
8 changes: 4 additions & 4 deletions config/kareldb.properties
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ kafkacache.bootstrap.servers=localhost:9092
# The group ID to be used for internal consumers, which needs to be unique for each node.
kafkacache.group.id=kareldb-1

# Whether to enable RocksDB in KCache
rocksdb.enable=true
# The backing cache
kafkacache.backing.cache=rocksdb

# The root directory for RocksDB storage
rocksdb.root.dir=/tmp
# The root directory for backing cache storage
kafkacache.data.dir=/tmp

# If true, API requests that fail will include extra debugging information, including stack traces
debug=false
4 changes: 4 additions & 0 deletions kareldb-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ limitations under the License.
<groupId>io.kcache</groupId>
<artifactId>kcache-lmdb</artifactId>
</dependency>
<dependency>
<groupId>io.kcache</groupId>
<artifactId>kcache-mapdb</artifactId>
</dependency>
<dependency>
<groupId>io.kcache</groupId>
<artifactId>kcache-rocksdb</artifactId>
Expand Down
22 changes: 0 additions & 22 deletions kareldb-core/src/main/java/io/kareldb/KarelDbConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,6 @@ public class KarelDbConfig extends KafkaCacheConfig {
"If true, this node can participate in leader election. In a multi-colo setup, turn this off "
+ "for clusters in the replica data center.";

public static final String ROCKS_DB_ENABLE_CONFIG = "rocksdb.enable";
public static final boolean ROCKS_DB_ENABLE_DEFAULT = true;
public static final String ROCKS_DB_ENABLE_DOC =
"Whether to enable RocksDB within KCache.";

public static final String ROCKS_DB_ROOT_DIR_CONFIG = "rocksdb.root.dir";
public static final String ROCKS_DB_ROOT_DIR_DEFAULT = "/tmp";
public static final String ROCKS_DB_ROOT_DIR_DOC =
"Root directory for RocksDB storage.";

public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location";
public static final String SSL_KEYSTORE_LOCATION_DOC =
"Location of the keystore file to use for SSL. This is required for HTTPS.";
Expand Down Expand Up @@ -192,18 +182,6 @@ public class KarelDbConfig extends KafkaCacheConfig {
LEADER_ELIGIBILITY_DEFAULT,
Importance.MEDIUM,
LEADER_ELIGIBILITY_DOC
).define(
ROCKS_DB_ENABLE_CONFIG,
Type.BOOLEAN,
ROCKS_DB_ENABLE_DEFAULT,
Importance.MEDIUM,
ROCKS_DB_ENABLE_DOC
).define(
ROCKS_DB_ROOT_DIR_CONFIG,
Type.STRING,
ROCKS_DB_ROOT_DIR_DEFAULT,
Importance.MEDIUM,
ROCKS_DB_ROOT_DIR_DOC
).define(
SSL_KEYSTORE_LOCATION_CONFIG,
Type.STRING,
Expand Down
12 changes: 1 addition & 11 deletions kareldb-core/src/main/java/io/kareldb/kafka/KafkaTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.kareldb.kafka;

import com.google.common.collect.ImmutableMap;
import io.kareldb.KarelDbConfig;
import io.kareldb.avro.AvroKeyComparator;
import io.kareldb.avro.AvroUtils;
import io.kareldb.kafka.serialization.KafkaKeySerde;
Expand All @@ -30,8 +29,6 @@
import io.kcache.Cache;
import io.kcache.KafkaCache;
import io.kcache.KafkaCacheConfig;
import io.kcache.rocksdb.RocksDBCache;
import io.kcache.utils.InMemoryCache;
import io.kcache.utils.TransformedRawCache;
import org.apache.avro.Conversions;
import org.apache.avro.JsonProperties;
Expand Down Expand Up @@ -101,16 +98,9 @@ public void configure(Map<String, ?> operand) {
configs.put(KafkaCacheConfig.KAFKACACHE_TOPIC_CONFIG, topic);
configs.put(KafkaCacheConfig.KAFKACACHE_GROUP_ID_CONFIG, groupId);
configs.put(KafkaCacheConfig.KAFKACACHE_CLIENT_ID_CONFIG, groupId + "-" + topic);
String enableRocksDbStr = (String) configs.getOrDefault(KarelDbConfig.ROCKS_DB_ENABLE_CONFIG, "true");
boolean enableRocksDb = Boolean.parseBoolean(enableRocksDbStr);
String rootDir = (String) configs.getOrDefault(
KarelDbConfig.ROCKS_DB_ROOT_DIR_CONFIG, KarelDbConfig.ROCKS_DB_ROOT_DIR_DEFAULT);
Comparator<byte[]> cmp = new AvroKeyComparator(schemas.left);
Cache<byte[], byte[]> cache = enableRocksDb
? new RocksDBCache<>(topic, "rocksdb", rootDir, Serdes.ByteArray(), Serdes.ByteArray(), cmp)
: new InMemoryCache<>(cmp);
this.rows = new KafkaCache<>(
new KafkaCacheConfig(configs), Serdes.ByteArray(), Serdes.ByteArray(), null, cache);
new KafkaCacheConfig(configs), Serdes.ByteArray(), Serdes.ByteArray(), null, topic, cmp);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private List<GenericRecord> toArray(NavigableMap<Long, VersionedValue> object) {
builder.set(field.e, versionedValue.isDeleted());
} else {
if (!versionedValue.isDeleted()) {
Comparable v = AvroSchema.toAvroValue(field.e.schema(), value[field.i - 3]);
Object v = AvroSchema.toAvroValue(field.e.schema(), value[field.i - 3]);
if (v != null) {
builder.set(field.e, v);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,8 @@ protected Properties createProperties() {
"org.apache.calcite.sql.ddl.ExtensionDdlExecutor#PARSER_FACTORY");
properties.put("schema.kind", "io.kareldb.kafka.KafkaSchema");
properties.put("schema.kafkacache.bootstrap.servers", bootstrapServers);
properties.put("schema.rocksdb.enable", "true");
properties.put("schema.rocksdb.root.dir", tempDir.getAbsolutePath());
properties.put("schema.kafkacache.backing.cache", "rocksdb");
properties.put("schema.kafkacache.data.dir", tempDir.getAbsolutePath());
return properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private void setUpServer() {
protected void injectKarelDbProperties(Properties props) {
props.put(KarelDbConfig.LISTENERS_CONFIG, "http://0.0.0.0:" + serverPort);
props.put(KarelDbConfig.KAFKACACHE_BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(KarelDbConfig.ROCKS_DB_ROOT_DIR_CONFIG, tempDir.getAbsolutePath());
props.put(KarelDbConfig.KAFKACACHE_DATA_DIR_CONFIG, tempDir.getAbsolutePath());
}

/**
Expand Down
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ limitations under the License.
<junit.version>4.12</junit.version>
<kafka.scala.version>2.13</kafka.scala.version>
<kafka.version>2.6.0</kafka.version>
<kcache.version>3.3.0</kcache.version>
<kcache.version>3.3.1</kcache.version>
<maven-dependency-plugin.version>3.1.2</maven-dependency-plugin.version>
<maven-enforcer-plugin.version>3.0.0-M3</maven-enforcer-plugin.version>
<maven-scm-provider.version>1.10.0</maven-scm-provider.version>
Expand Down Expand Up @@ -169,6 +169,11 @@ limitations under the License.
<artifactId>kcache-lmdb</artifactId>
<version>${kcache.version}</version>
</dependency>
<dependency>
<groupId>io.kcache</groupId>
<artifactId>kcache-mapdb</artifactId>
<version>${kcache.version}</version>
</dependency>
<dependency>
<groupId>io.kcache</groupId>
<artifactId>kcache-rocksdb</artifactId>
Expand Down

0 comments on commit 587199f

Please sign in to comment.