From 79439f6e54af5df1a5c06143e7c26a01c5cec2a3 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Mon, 7 Sep 2020 23:52:17 -0700 Subject: [PATCH 1/3] Upgrade to KCache 3.3.1 --- README.md | 7 +++--- config/kareldb-inmemory.properties | 8 +++---- config/kareldb-replica.properties | 8 +++---- config/kareldb-secure-replica.properties | 8 +++---- config/kareldb-secure.properties | 8 +++---- config/kareldb.properties | 8 +++---- .../main/java/io/kareldb/KarelDbConfig.java | 22 ------------------- .../java/io/kareldb/kafka/KafkaTable.java | 9 +------- .../serialization/KafkaValueSerializer.java | 2 +- .../io/kareldb/jdbc/BaseJDBCTestCase.java | 4 ++-- .../utils/RemoteClusterTestHarness.java | 2 +- pom.xml | 2 +- 12 files changed, 30 insertions(+), 58 deletions(-) diff --git a/README.md b/README.md index b38d63b9..e0e95cf9 100644 --- a/README.md +++ b/README.md @@ -70,7 +70,7 @@ properties.put("schemaFactory", "io.kareldb.schema.SchemaFactory"); properties.put("parserFactory", "org.apache.calcite.sql.parser.parserextension.ExtensionSqlParserImpl#FACTORY"); properties.put("schema.kind", "io.kareldb.kafka.KafkaSchema"); properties.put("schema.kafkacache.bootstrap.servers", bootstrapServers); -properties.put("schema.rocksdb.root.dir", "/tmp"); +properties.put("schema.kafkacache.data.dir", "/tmp"); try (Connection conn = DriverManager.getConnection("jdbc:kareldb:", properties); Statement s = conn.createStatement()) { @@ -125,8 +125,9 @@ KarelDB has a number of configuration properties that can be specified. When us - `listeners` - List of listener URLs that include the scheme, host, and port. Defaults to `http://0.0.0.0:8765`. - `cluster.group.id` - The group ID to be used for leader election. Defaults to `kareldb`. - `leader.eligibility` - Whether this node can participate in leader election. Defaults to true. -- `rocksdb.enable` - Whether to use RocksDB in KCache. Defaults to true. Otherwise an in-memory cache is used. -- `rocksdb.root.dir` - The root directory for RocksDB storage. Defaults to `/tmp`. +- `kafkacache.backing.cache` - The backing cache for KCache, one of `memory` (default +), `bdbje`, `lmdb`, or `rocksdb`. +- `kafkacache.data.dir` - The root directory for backing cache storage. Defaults to `/tmp`. - `kafkacache.bootstrap.servers` - A list of host and port pairs to use for establishing the initial connection to Kafka. - `kafkacache.group.id` - The group ID to use for the internal consumers, which needs to be unique for each node. Defaults to `kareldb-1`. - `kafkacache.topic.replication.factor` - The replication factor for the internal topics created by KarelDB. Defaults to 3. diff --git a/config/kareldb-inmemory.properties b/config/kareldb-inmemory.properties index d0160064..c38e2ba1 100644 --- a/config/kareldb-inmemory.properties +++ b/config/kareldb-inmemory.properties @@ -25,11 +25,11 @@ kafkacache.bootstrap.servers=localhost:9092 # The group ID to be used for internal consumers, which needs to be unique for each node. kafkacache.group.id=kareldb-1 -# Whether to enable RocksDB in KCache -rocksdb.enable=false +# The backing cache +kafkacache.backing.cache=memory -# The root directory for RocksDB storage -rocksdb.root.dir=/tmp +# The root directory for backing cache storage +kafkacache.data.dir=/tmp # If true, API requests that fail will include extra debugging information, including stack traces debug=false diff --git a/config/kareldb-replica.properties b/config/kareldb-replica.properties index 44b45ee2..7e38a423 100644 --- a/config/kareldb-replica.properties +++ b/config/kareldb-replica.properties @@ -25,11 +25,11 @@ kafkacache.bootstrap.servers=localhost:9092 # The group ID to be used for internal consumers, which needs to be unique for each node. kafkacache.group.id=kareldb-2 -# Whether to enable RocksDB in KCache -rocksdb.enable=true +# The backing cache +kafkacache.backing.cache=rocksdb -# The root directory for RocksDB storage -rocksdb.root.dir=/tmp/replica +# The root directory for backing cache storage +kafkacache.data.dir=/tmp/replica # If true, API requests that fail will include extra debugging information, including stack traces debug=false diff --git a/config/kareldb-secure-replica.properties b/config/kareldb-secure-replica.properties index 1b740ae9..ed7ea03b 100644 --- a/config/kareldb-secure-replica.properties +++ b/config/kareldb-secure-replica.properties @@ -25,11 +25,11 @@ kafkacache.bootstrap.servers=localhost:9092 # The group ID to be used for internal consumers, which needs to be unique for each node. kafkacache.group.id=kareldb-2 -# Whether to enable RocksDB in KCache -rocksdb.enable=true +# The backing cache +kafkacache.backing.cache=rocksdb -# The root directory for RocksDB storage -rocksdb.root.dir=/tmp/replica +# The root directory for backing cache storage +kafkacache.data.dir=/tmp/replica ssl.keystore.location=custom.keystore ssl.keystore.password=changeme diff --git a/config/kareldb-secure.properties b/config/kareldb-secure.properties index ae6f037a..40de6fe8 100644 --- a/config/kareldb-secure.properties +++ b/config/kareldb-secure.properties @@ -25,11 +25,11 @@ kafkacache.bootstrap.servers=localhost:9092 # The group ID to be used for internal consumers, which needs to be unique for each node. kafkacache.group.id=kareldb-1 -# Whether to enable RocksDB in KCache -rocksdb.enable=true +# The backing cache +kafkacache.backing.cache=rocksdb -# The root directory for RocksDB storage -rocksdb.root.dir=/tmp +# The root directory for backing cache storage +kafkacache.data.dir=/tmp ssl.keystore.location=custom.keystore ssl.keystore.password=changeme diff --git a/config/kareldb.properties b/config/kareldb.properties index d55424df..7bbb3c87 100644 --- a/config/kareldb.properties +++ b/config/kareldb.properties @@ -25,11 +25,11 @@ kafkacache.bootstrap.servers=localhost:9092 # The group ID to be used for internal consumers, which needs to be unique for each node. kafkacache.group.id=kareldb-1 -# Whether to enable RocksDB in KCache -rocksdb.enable=true +# The backing cache +kafkacache.backing.cache=rocksdb -# The root directory for RocksDB storage -rocksdb.root.dir=/tmp +# The root directory for backing cache storage +kafkacache.data.dir=/tmp # If true, API requests that fail will include extra debugging information, including stack traces debug=false diff --git a/kareldb-core/src/main/java/io/kareldb/KarelDbConfig.java b/kareldb-core/src/main/java/io/kareldb/KarelDbConfig.java index 41682e51..b8e0ecab 100644 --- a/kareldb-core/src/main/java/io/kareldb/KarelDbConfig.java +++ b/kareldb-core/src/main/java/io/kareldb/KarelDbConfig.java @@ -45,16 +45,6 @@ public class KarelDbConfig extends KafkaCacheConfig { "If true, this node can participate in leader election. In a multi-colo setup, turn this off " + "for clusters in the replica data center."; - public static final String ROCKS_DB_ENABLE_CONFIG = "rocksdb.enable"; - public static final boolean ROCKS_DB_ENABLE_DEFAULT = true; - public static final String ROCKS_DB_ENABLE_DOC = - "Whether to enable RocksDB within KCache."; - - public static final String ROCKS_DB_ROOT_DIR_CONFIG = "rocksdb.root.dir"; - public static final String ROCKS_DB_ROOT_DIR_DEFAULT = "/tmp"; - public static final String ROCKS_DB_ROOT_DIR_DOC = - "Root directory for RocksDB storage."; - public static final String SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location"; public static final String SSL_KEYSTORE_LOCATION_DOC = "Location of the keystore file to use for SSL. This is required for HTTPS."; @@ -192,18 +182,6 @@ public class KarelDbConfig extends KafkaCacheConfig { LEADER_ELIGIBILITY_DEFAULT, Importance.MEDIUM, LEADER_ELIGIBILITY_DOC - ).define( - ROCKS_DB_ENABLE_CONFIG, - Type.BOOLEAN, - ROCKS_DB_ENABLE_DEFAULT, - Importance.MEDIUM, - ROCKS_DB_ENABLE_DOC - ).define( - ROCKS_DB_ROOT_DIR_CONFIG, - Type.STRING, - ROCKS_DB_ROOT_DIR_DEFAULT, - Importance.MEDIUM, - ROCKS_DB_ROOT_DIR_DOC ).define( SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, diff --git a/kareldb-core/src/main/java/io/kareldb/kafka/KafkaTable.java b/kareldb-core/src/main/java/io/kareldb/kafka/KafkaTable.java index aac59225..d6fc1edf 100644 --- a/kareldb-core/src/main/java/io/kareldb/kafka/KafkaTable.java +++ b/kareldb-core/src/main/java/io/kareldb/kafka/KafkaTable.java @@ -101,16 +101,9 @@ public void configure(Map operand) { configs.put(KafkaCacheConfig.KAFKACACHE_TOPIC_CONFIG, topic); configs.put(KafkaCacheConfig.KAFKACACHE_GROUP_ID_CONFIG, groupId); configs.put(KafkaCacheConfig.KAFKACACHE_CLIENT_ID_CONFIG, groupId + "-" + topic); - String enableRocksDbStr = (String) configs.getOrDefault(KarelDbConfig.ROCKS_DB_ENABLE_CONFIG, "true"); - boolean enableRocksDb = Boolean.parseBoolean(enableRocksDbStr); - String rootDir = (String) configs.getOrDefault( - KarelDbConfig.ROCKS_DB_ROOT_DIR_CONFIG, KarelDbConfig.ROCKS_DB_ROOT_DIR_DEFAULT); Comparator cmp = new AvroKeyComparator(schemas.left); - Cache cache = enableRocksDb - ? new RocksDBCache<>(topic, "rocksdb", rootDir, Serdes.ByteArray(), Serdes.ByteArray(), cmp) - : new InMemoryCache<>(cmp); this.rows = new KafkaCache<>( - new KafkaCacheConfig(configs), Serdes.ByteArray(), Serdes.ByteArray(), null, cache); + new KafkaCacheConfig(configs), Serdes.ByteArray(), Serdes.ByteArray(), null, topic, cmp); } @Override diff --git a/kareldb-core/src/main/java/io/kareldb/kafka/serialization/KafkaValueSerializer.java b/kareldb-core/src/main/java/io/kareldb/kafka/serialization/KafkaValueSerializer.java index 76738edb..d59642b4 100644 --- a/kareldb-core/src/main/java/io/kareldb/kafka/serialization/KafkaValueSerializer.java +++ b/kareldb-core/src/main/java/io/kareldb/kafka/serialization/KafkaValueSerializer.java @@ -99,7 +99,7 @@ private List toArray(NavigableMap object) { builder.set(field.e, versionedValue.isDeleted()); } else { if (!versionedValue.isDeleted()) { - Comparable v = AvroSchema.toAvroValue(field.e.schema(), value[field.i - 3]); + Object v = AvroSchema.toAvroValue(field.e.schema(), value[field.i - 3]); if (v != null) { builder.set(field.e, v); } diff --git a/kareldb-core/src/test/java/io/kareldb/jdbc/BaseJDBCTestCase.java b/kareldb-core/src/test/java/io/kareldb/jdbc/BaseJDBCTestCase.java index 14d41c14..c8804bdb 100644 --- a/kareldb-core/src/test/java/io/kareldb/jdbc/BaseJDBCTestCase.java +++ b/kareldb-core/src/test/java/io/kareldb/jdbc/BaseJDBCTestCase.java @@ -490,8 +490,8 @@ protected Properties createProperties() { "org.apache.calcite.sql.ddl.ExtensionDdlExecutor#PARSER_FACTORY"); properties.put("schema.kind", "io.kareldb.kafka.KafkaSchema"); properties.put("schema.kafkacache.bootstrap.servers", bootstrapServers); - properties.put("schema.rocksdb.enable", "true"); - properties.put("schema.rocksdb.root.dir", tempDir.getAbsolutePath()); + properties.put("schema.kafkacache.backing.cache", "rocksdb"); + properties.put("schema.kafkacache.data.dir", tempDir.getAbsolutePath()); return properties; } diff --git a/kareldb-server/src/test/java/io/kareldb/server/utils/RemoteClusterTestHarness.java b/kareldb-server/src/test/java/io/kareldb/server/utils/RemoteClusterTestHarness.java index 0cc2040a..1af45472 100644 --- a/kareldb-server/src/test/java/io/kareldb/server/utils/RemoteClusterTestHarness.java +++ b/kareldb-server/src/test/java/io/kareldb/server/utils/RemoteClusterTestHarness.java @@ -90,7 +90,7 @@ private void setUpServer() { protected void injectKarelDbProperties(Properties props) { props.put(KarelDbConfig.LISTENERS_CONFIG, "http://0.0.0.0:" + serverPort); props.put(KarelDbConfig.KAFKACACHE_BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(KarelDbConfig.ROCKS_DB_ROOT_DIR_CONFIG, tempDir.getAbsolutePath()); + props.put(KarelDbConfig.KAFKACACHE_DATA_DIR_CONFIG, tempDir.getAbsolutePath()); } /** diff --git a/pom.xml b/pom.xml index 5ce3be1a..ca1a9635 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ limitations under the License. 4.12 2.13 2.6.0 - 3.3.0 + 3.3.1-SNAPSHOT 3.1.2 3.0.0-M3 1.10.0 From 2e55905ee586c0dd29c78f57e35014c5f1447484 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Tue, 8 Sep 2020 22:10:03 -0700 Subject: [PATCH 2/3] Add support for MapDB --- kareldb-core/pom.xml | 4 ++++ pom.xml | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/kareldb-core/pom.xml b/kareldb-core/pom.xml index b4cfd9c6..51f764b8 100644 --- a/kareldb-core/pom.xml +++ b/kareldb-core/pom.xml @@ -59,6 +59,10 @@ limitations under the License. io.kcache kcache-lmdb + + io.kcache + kcache-mapdb + io.kcache kcache-rocksdb diff --git a/pom.xml b/pom.xml index ca1a9635..d7b1fa82 100644 --- a/pom.xml +++ b/pom.xml @@ -169,6 +169,11 @@ limitations under the License. kcache-lmdb ${kcache.version} + + io.kcache + kcache-mapdb + ${kcache.version} + io.kcache kcache-rocksdb From 8e0fd6dbd8b15f1a5eeff6c3c738d877581c65df Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Sat, 12 Sep 2020 11:15:40 -0700 Subject: [PATCH 3/3] Minor fixes --- kareldb-core/src/main/java/io/kareldb/kafka/KafkaTable.java | 3 --- pom.xml | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/kareldb-core/src/main/java/io/kareldb/kafka/KafkaTable.java b/kareldb-core/src/main/java/io/kareldb/kafka/KafkaTable.java index d6fc1edf..9dcf3d4c 100644 --- a/kareldb-core/src/main/java/io/kareldb/kafka/KafkaTable.java +++ b/kareldb-core/src/main/java/io/kareldb/kafka/KafkaTable.java @@ -17,7 +17,6 @@ package io.kareldb.kafka; import com.google.common.collect.ImmutableMap; -import io.kareldb.KarelDbConfig; import io.kareldb.avro.AvroKeyComparator; import io.kareldb.avro.AvroUtils; import io.kareldb.kafka.serialization.KafkaKeySerde; @@ -30,8 +29,6 @@ import io.kcache.Cache; import io.kcache.KafkaCache; import io.kcache.KafkaCacheConfig; -import io.kcache.rocksdb.RocksDBCache; -import io.kcache.utils.InMemoryCache; import io.kcache.utils.TransformedRawCache; import org.apache.avro.Conversions; import org.apache.avro.JsonProperties; diff --git a/pom.xml b/pom.xml index d7b1fa82..8c6da7bc 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ limitations under the License. 4.12 2.13 2.6.0 - 3.3.1-SNAPSHOT + 3.3.1 3.1.2 3.0.0-M3 1.10.0