From 273a5129d0f5ff95f6979fa237292693ff915cf8 Mon Sep 17 00:00:00 2001 From: Lei Wang Date: Fri, 21 Jun 2024 18:06:58 +0800 Subject: [PATCH] support converter failover Signed-off-by: Lei Wang --- .../gart/templates/converter/deployment.yaml | 13 ++- converter/binlog_convert.cc | 108 ++++++++++++++++-- converter/kafka_helper.h | 51 ++++++++- converter/parser.cc | 4 +- converter/parser.h | 2 +- vegito/src/framework/bench_runner.cc | 2 + 6 files changed, 165 insertions(+), 15 deletions(-) diff --git a/charts/gart/templates/converter/deployment.yaml b/charts/gart/templates/converter/deployment.yaml index 873a8b3..c01880b 100644 --- a/charts/gart/templates/converter/deployment.yaml +++ b/charts/gart/templates/converter/deployment.yaml @@ -35,10 +35,20 @@ spec: cd /workspace/gart/build && until nc -z {{ $kafka_service_name }} {{ $kafka_service_port }}; do echo waiting for kafka; sleep 5; done && until nc -z {{ $etcd_service_name }} {{ $etcd_service_port }}; do echo waiting for etcd; sleep 5; done && - echo "Applying Debezium connector configuration..." && export PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python && CONFIG=$(/workspace/gart/scripts/update_debezium_config.py {{ $etcd_service }} {{ .Values.dataconfig.etcdPrefix }} {{ .Values.dataconfig.dbName }} {{ .Values.dataconfig.dbType }}) && + export ETCDCTL_API=3 && + etcd_prefix=$(echo {{ .Values.dataconfig.etcdPrefix }}) && + etcd_key=$etcd_prefix"debezium_request_is_sent" && + etcd_endpoint=$(echo {{ $etcd_service }}) && + echo $etcd_endpoint && + echo $etcd_key && while : ; do + if [ "$(etcdctl --endpoints=$etcd_endpoint get $etcd_key | awk 'NR==2')" == "True" ]; then + echo "Debezium service is already ready." + break + fi + echo "Applying Debezium connector configuration..." HTTP_STATUS=$(curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \ --data "$CONFIG" \ http://{{ $debezium_service }}/connectors/ \ @@ -46,6 +56,7 @@ spec: if [ "$HTTP_STATUS" -eq 200 ] || [ "$HTTP_STATUS" -eq 201 ]; then echo "Debezium service is ready." + etcdctl --endpoints=$etcd_endpoint put $etcd_key "True" break else echo "Debezium service not ready, waiting... Status: $HTTP_STATUS" diff --git a/converter/binlog_convert.cc b/converter/binlog_convert.cc index 01a7bd2..c7619e5 100644 --- a/converter/binlog_convert.cc +++ b/converter/binlog_convert.cc @@ -13,12 +13,15 @@ * limitations under the License. */ +#include #include #include +#include #include "converter/flags.h" #include "converter/kafka_helper.h" #include "converter/parser.h" +#include "util/macros.h" using converter::KafkaConsumer; using converter::KafkaOutputStream; @@ -39,8 +42,6 @@ int main(int argc, char** argv) { google::InitGoogleLogging(argv[0]); gflags::ParseCommandLineFlags(&argc, &argv, true); - shared_ptr consumer = make_shared( - FLAGS_read_kafka_broker_list, FLAGS_read_kafka_topic, "gart_consumer"); shared_ptr producer = make_shared( FLAGS_write_kafka_broker_list, FLAGS_write_kafka_topic); KafkaOutputStream ostream(producer); @@ -53,6 +54,52 @@ int main(int argc, char** argv) { int epoch = 0; + // catch up mode is used for failover + bool catch_up_mode = true; + int64_t last_processed_offset = 0; + int64_t processed_count = 0; + + shared_ptr consumer_for_get_last_commit = + make_shared(FLAGS_write_kafka_broker_list, + FLAGS_write_kafka_topic, "gart_consumer", 0, + RdKafka::Topic::OFFSET_BEGINNING); + bool topic_exist = consumer_for_get_last_commit->topic_exist(); + + if (!topic_exist) { + consumer_for_get_last_commit->stop(); + std::cout << "Empty...Will start to read topic messages from beginning." + << std::endl; + } else { + std::pair low_high_pair = + consumer_for_get_last_commit->query_watermark_offsets(); + consumer_for_get_last_commit->stop(); + int64_t high_offset = low_high_pair.second; + std::cout << "Low offset: " << low_high_pair.first + << ", High offset: " << low_high_pair.second << std::endl; + if (high_offset == 0) { + std::cout << "Will start to read topic messages from beginning." + << std::endl; + } else { + consumer_for_get_last_commit->start(high_offset - 1); + RdKafka::Message* last_commit_msg = + consumer_for_get_last_commit->consume(nullptr, 1000); + string line(static_cast(last_commit_msg->payload()), + last_commit_msg->len()); + std::cout << "Last message is " << line << std::endl; + char delimiter = '|'; + size_t pos = line.find_last_of(delimiter); + last_processed_offset = std::stoll(line.substr(pos + 1)); + std::cout << "Will start to read topic messages from " + << last_processed_offset << std::endl; + consumer_for_get_last_commit->delete_message(last_commit_msg); + consumer_for_get_last_commit->stop(); + } + } + + shared_ptr consumer = make_shared( + FLAGS_read_kafka_broker_list, FLAGS_read_kafka_topic, "gart_consumer", 0, + RdKafka::Topic::OFFSET_BEGINNING); + #ifdef USE_DEBEZIUM // used for consistent epoch calculation int last_tx_id = -1; @@ -86,7 +133,16 @@ int main(int argc, char** argv) { } while (log_entry.more_entires()) { - ostream << log_entry.to_string() << flush; + if (catch_up_mode) { + if (processed_count >= last_processed_offset) { + catch_up_mode = false; + } + } + processed_count++; + if (likely(!catch_up_mode)) { + ostream << log_entry.to_string(processed_count) << flush; + } + GART_CHECK_OK(parser.parse(log_entry, line, epoch)); if (!log_entry.valid()) { @@ -94,7 +150,17 @@ int main(int argc, char** argv) { } } - ostream << log_entry.to_string() << flush; + if (catch_up_mode) { + if (processed_count >= last_processed_offset) { + catch_up_mode = false; + } + } + + processed_count++; + if (likely(!catch_up_mode)) { + ostream << log_entry.to_string(processed_count) << flush; + } + consumer->delete_message(msg); if (log_entry.last_snapshot()) { @@ -103,7 +169,16 @@ int main(int argc, char** argv) { last_tx_id = log_entry.get_tx_id(); last_log_count = log_count; epoch = 1; - ostream << LogEntry::bulk_load_end().to_string() << flush; + if (catch_up_mode) { + if (processed_count >= last_processed_offset) { + catch_up_mode = false; + } + } + processed_count++; + if (!catch_up_mode) { + ostream << LogEntry::bulk_load_end().to_string(processed_count) + << flush; + } break; } @@ -175,7 +250,16 @@ int main(int argc, char** argv) { #endif while (log_entry.more_entires()) { - ostream << log_entry.to_string() << flush; + if (catch_up_mode) { + if (processed_count >= last_processed_offset) { + catch_up_mode = false; + } + } + processed_count++; + if (!catch_up_mode) { + ostream << log_entry.to_string(processed_count) << flush; + } + GART_CHECK_OK(parser.parse(log_entry, line, epoch)); if (!log_entry.valid()) { @@ -183,7 +267,17 @@ int main(int argc, char** argv) { } } - ostream << log_entry.to_string() << flush; + if (catch_up_mode) { + if (processed_count >= last_processed_offset) { + catch_up_mode = false; + } + } + + processed_count++; + if (!catch_up_mode) { + ostream << log_entry.to_string(processed_count) << flush; + } + consumer->delete_message(msg); } } diff --git a/converter/kafka_helper.h b/converter/kafka_helper.h index 928099e..0e6a2f0 100644 --- a/converter/kafka_helper.h +++ b/converter/kafka_helper.h @@ -19,6 +19,7 @@ #include #include #include +#include #include "glog/logging.h" #include "librdkafka/rdkafkacpp.h" @@ -128,12 +129,14 @@ class KafkaOutputStream : public std::ostream { */ class KafkaConsumer { public: - explicit KafkaConsumer(const std::string& broker_list, - const std::string& topic, const std::string& group_id, - int partition = 0) + explicit KafkaConsumer( + const std::string& broker_list, const std::string& topic, + const std::string& group_id, int partition = 0, + int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING) : brokers_(broker_list), topic_(topic), group_id_(group_id), + start_offset_(start_offset), partition_(partition) { RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); std::string rdkafka_err; @@ -165,10 +168,9 @@ class KafkaConsumer { topic_ptr_ = RdKafka::Topic::create(consumer_.get(), topic_, tconf, rdkafka_err); - int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING; RdKafka::ErrorCode resp = - consumer_->start(topic_ptr_, partition_, start_offset); + consumer_->start(topic_ptr_, partition_, start_offset_); if (resp != RdKafka::ERR_NO_ERROR) { LOG(ERROR) << "Failed to start consumer: " << RdKafka::err2str(resp); @@ -217,6 +219,44 @@ class KafkaConsumer { } } + inline std::pair query_watermark_offsets() const { + int64_t low, high; + RdKafka::ErrorCode err = consumer_->query_watermark_offsets( + topic_, partition_, &low, &high, 5000); + if (err != RdKafka::ERR_NO_ERROR) { + LOG(INFO) << "Failed to query watermark offsets: " + << RdKafka::err2str(err); + return std::make_pair(0, 0); + } + return std::make_pair(low, high); + } + + inline RdKafka::ErrorCode stop() const { + return consumer_->stop(topic_ptr_, partition_); + } + + inline RdKafka::ErrorCode seek(int64_t offset) const { + return consumer_->seek(topic_ptr_, partition_, offset, 5000); + } + + inline RdKafka::ErrorCode start(int64_t new_offset) const { + return consumer_->start(topic_ptr_, partition_, new_offset); + } + + inline bool topic_exist() { + RdKafka::Metadata* metadata = nullptr; + RdKafka::ErrorCode metadata_err = + consumer_->metadata(false, topic_ptr_, &metadata, 5000); + delete metadata; + if (metadata_err != RdKafka::ERR_NO_ERROR) { + std::cout << "Failed to get metadata: " << RdKafka::err2str(metadata_err) + << std::endl; + return false; + } else { + return true; + } + } + ~KafkaConsumer() = default; private: @@ -224,6 +264,7 @@ class KafkaConsumer { const std::string topic_; const std::string group_id_; const int partition_; + const int64_t start_offset_; RdKafka::Topic* topic_ptr_; std::unique_ptr consumer_; diff --git a/converter/parser.cc b/converter/parser.cc index 9f48dab..5c7c56c 100644 --- a/converter/parser.cc +++ b/converter/parser.cc @@ -94,11 +94,12 @@ LogEntry LogEntry::bulk_load_end() { return entry; } -string LogEntry::to_string() const { +string LogEntry::to_string(int64_t binlog_offset) const { string base; if (op_type == OpType::BULKLOAD_END) { base = "bulkload_end"; append_str(base, epoch); + append_str(base, binlog_offset); return base; } @@ -133,6 +134,7 @@ string LogEntry::to_string() const { append_str(base, str); } + append_str(base, binlog_offset); return base; } diff --git a/converter/parser.h b/converter/parser.h index 594ccb9..6c977ef 100644 --- a/converter/parser.h +++ b/converter/parser.h @@ -39,7 +39,7 @@ class LogEntry { static LogEntry bulk_load_end(); - std::string to_string() const; + std::string to_string(int64_t binlog_offset) const; int get_tx_id() const { return tx_id; } diff --git a/vegito/src/framework/bench_runner.cc b/vegito/src/framework/bench_runner.cc index 981f687..dd2ad89 100644 --- a/vegito/src/framework/bench_runner.cc +++ b/vegito/src/framework/bench_runner.cc @@ -544,6 +544,8 @@ void Runner::apply_log_to_store_(const string_view& log, int p_id) { } sv_vec.erase(sv_vec.begin(), sv_vec.begin() + 1); + // remove the last element of sv_vec, since the last element of sv_vec is offset of binlog + sv_vec.pop_back(); if (op == "add_vertex") { process_add_vertex(sv_vec, graph_stores_[p_id]);