Skip to content

Commit

Permalink
support converter failover
Browse files Browse the repository at this point in the history
Signed-off-by: Lei Wang <[email protected]>
  • Loading branch information
doudoubobo committed Jun 21, 2024
1 parent 3331667 commit 273a512
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 15 deletions.
13 changes: 12 additions & 1 deletion charts/gart/templates/converter/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,28 @@ spec:
cd /workspace/gart/build &&
until nc -z {{ $kafka_service_name }} {{ $kafka_service_port }}; do echo waiting for kafka; sleep 5; done &&
until nc -z {{ $etcd_service_name }} {{ $etcd_service_port }}; do echo waiting for etcd; sleep 5; done &&
echo "Applying Debezium connector configuration..." &&
export PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python &&
CONFIG=$(/workspace/gart/scripts/update_debezium_config.py {{ $etcd_service }} {{ .Values.dataconfig.etcdPrefix }} {{ .Values.dataconfig.dbName }} {{ .Values.dataconfig.dbType }}) &&
export ETCDCTL_API=3 &&
etcd_prefix=$(echo {{ .Values.dataconfig.etcdPrefix }}) &&
etcd_key=$etcd_prefix"debezium_request_is_sent" &&
etcd_endpoint=$(echo {{ $etcd_service }}) &&
echo $etcd_endpoint &&
echo $etcd_key &&
while : ; do
if [ "$(etcdctl --endpoints=$etcd_endpoint get $etcd_key | awk 'NR==2')" == "True" ]; then
echo "Debezium service is already ready."
break
fi
echo "Applying Debezium connector configuration..."
HTTP_STATUS=$(curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
--data "$CONFIG" \
http://{{ $debezium_service }}/connectors/ \
-o /dev/null -w '%{http_code}')
if [ "$HTTP_STATUS" -eq 200 ] || [ "$HTTP_STATUS" -eq 201 ]; then
echo "Debezium service is ready."
etcdctl --endpoints=$etcd_endpoint put $etcd_key "True"
break
else
echo "Debezium service not ready, waiting... Status: $HTTP_STATUS"
Expand Down
108 changes: 101 additions & 7 deletions converter/binlog_convert.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
* limitations under the License.
*/

#include <sys/types.h>
#include <chrono>
#include <fstream>
#include <iostream>

#include "converter/flags.h"
#include "converter/kafka_helper.h"
#include "converter/parser.h"
#include "util/macros.h"

using converter::KafkaConsumer;
using converter::KafkaOutputStream;
Expand All @@ -39,8 +42,6 @@ int main(int argc, char** argv) {
google::InitGoogleLogging(argv[0]);
gflags::ParseCommandLineFlags(&argc, &argv, true);

shared_ptr<KafkaConsumer> consumer = make_shared<KafkaConsumer>(
FLAGS_read_kafka_broker_list, FLAGS_read_kafka_topic, "gart_consumer");
shared_ptr<KafkaProducer> producer = make_shared<KafkaProducer>(
FLAGS_write_kafka_broker_list, FLAGS_write_kafka_topic);
KafkaOutputStream ostream(producer);
Expand All @@ -53,6 +54,52 @@ int main(int argc, char** argv) {

int epoch = 0;

// catch up mode is used for failover
bool catch_up_mode = true;
int64_t last_processed_offset = 0;
int64_t processed_count = 0;

shared_ptr<KafkaConsumer> consumer_for_get_last_commit =
make_shared<KafkaConsumer>(FLAGS_write_kafka_broker_list,
FLAGS_write_kafka_topic, "gart_consumer", 0,
RdKafka::Topic::OFFSET_BEGINNING);
bool topic_exist = consumer_for_get_last_commit->topic_exist();

if (!topic_exist) {
consumer_for_get_last_commit->stop();
std::cout << "Empty...Will start to read topic messages from beginning."
<< std::endl;
} else {
std::pair<int64_t, int64_t> low_high_pair =
consumer_for_get_last_commit->query_watermark_offsets();
consumer_for_get_last_commit->stop();
int64_t high_offset = low_high_pair.second;
std::cout << "Low offset: " << low_high_pair.first
<< ", High offset: " << low_high_pair.second << std::endl;
if (high_offset == 0) {
std::cout << "Will start to read topic messages from beginning."
<< std::endl;
} else {
consumer_for_get_last_commit->start(high_offset - 1);
RdKafka::Message* last_commit_msg =
consumer_for_get_last_commit->consume(nullptr, 1000);
string line(static_cast<const char*>(last_commit_msg->payload()),
last_commit_msg->len());
std::cout << "Last message is " << line << std::endl;
char delimiter = '|';
size_t pos = line.find_last_of(delimiter);
last_processed_offset = std::stoll(line.substr(pos + 1));
std::cout << "Will start to read topic messages from "
<< last_processed_offset << std::endl;
consumer_for_get_last_commit->delete_message(last_commit_msg);
consumer_for_get_last_commit->stop();
}
}

shared_ptr<KafkaConsumer> consumer = make_shared<KafkaConsumer>(
FLAGS_read_kafka_broker_list, FLAGS_read_kafka_topic, "gart_consumer", 0,
RdKafka::Topic::OFFSET_BEGINNING);

#ifdef USE_DEBEZIUM
// used for consistent epoch calculation
int last_tx_id = -1;
Expand Down Expand Up @@ -86,15 +133,34 @@ int main(int argc, char** argv) {
}

while (log_entry.more_entires()) {
ostream << log_entry.to_string() << flush;
if (catch_up_mode) {
if (processed_count >= last_processed_offset) {
catch_up_mode = false;
}
}
processed_count++;
if (likely(!catch_up_mode)) {
ostream << log_entry.to_string(processed_count) << flush;
}

GART_CHECK_OK(parser.parse(log_entry, line, epoch));

if (!log_entry.valid()) {
break;
}
}

ostream << log_entry.to_string() << flush;
if (catch_up_mode) {
if (processed_count >= last_processed_offset) {
catch_up_mode = false;
}
}

processed_count++;
if (likely(!catch_up_mode)) {
ostream << log_entry.to_string(processed_count) << flush;
}

consumer->delete_message(msg);

if (log_entry.last_snapshot()) {
Expand All @@ -103,7 +169,16 @@ int main(int argc, char** argv) {
last_tx_id = log_entry.get_tx_id();
last_log_count = log_count;
epoch = 1;
ostream << LogEntry::bulk_load_end().to_string() << flush;
if (catch_up_mode) {
if (processed_count >= last_processed_offset) {
catch_up_mode = false;
}
}
processed_count++;
if (!catch_up_mode) {
ostream << LogEntry::bulk_load_end().to_string(processed_count)
<< flush;
}
break;
}

Expand Down Expand Up @@ -175,15 +250,34 @@ int main(int argc, char** argv) {
#endif

while (log_entry.more_entires()) {
ostream << log_entry.to_string() << flush;
if (catch_up_mode) {
if (processed_count >= last_processed_offset) {
catch_up_mode = false;
}
}
processed_count++;
if (!catch_up_mode) {
ostream << log_entry.to_string(processed_count) << flush;
}

GART_CHECK_OK(parser.parse(log_entry, line, epoch));

if (!log_entry.valid()) {
break;
}
}

ostream << log_entry.to_string() << flush;
if (catch_up_mode) {
if (processed_count >= last_processed_offset) {
catch_up_mode = false;
}
}

processed_count++;
if (!catch_up_mode) {
ostream << log_entry.to_string(processed_count) << flush;
}

consumer->delete_message(msg);
}
}
51 changes: 46 additions & 5 deletions converter/kafka_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <iostream>
#include <memory>
#include <string>
#include <utility>

#include "glog/logging.h"
#include "librdkafka/rdkafkacpp.h"
Expand Down Expand Up @@ -128,12 +129,14 @@ class KafkaOutputStream : public std::ostream {
*/
class KafkaConsumer {
public:
explicit KafkaConsumer(const std::string& broker_list,
const std::string& topic, const std::string& group_id,
int partition = 0)
explicit KafkaConsumer(
const std::string& broker_list, const std::string& topic,
const std::string& group_id, int partition = 0,
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING)
: brokers_(broker_list),
topic_(topic),
group_id_(group_id),
start_offset_(start_offset),
partition_(partition) {
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
std::string rdkafka_err;
Expand Down Expand Up @@ -165,10 +168,9 @@ class KafkaConsumer {

topic_ptr_ =
RdKafka::Topic::create(consumer_.get(), topic_, tconf, rdkafka_err);
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;

RdKafka::ErrorCode resp =
consumer_->start(topic_ptr_, partition_, start_offset);
consumer_->start(topic_ptr_, partition_, start_offset_);

if (resp != RdKafka::ERR_NO_ERROR) {
LOG(ERROR) << "Failed to start consumer: " << RdKafka::err2str(resp);
Expand Down Expand Up @@ -217,13 +219,52 @@ class KafkaConsumer {
}
}

inline std::pair<int64_t, int64_t> query_watermark_offsets() const {
int64_t low, high;
RdKafka::ErrorCode err = consumer_->query_watermark_offsets(
topic_, partition_, &low, &high, 5000);
if (err != RdKafka::ERR_NO_ERROR) {
LOG(INFO) << "Failed to query watermark offsets: "
<< RdKafka::err2str(err);
return std::make_pair(0, 0);
}
return std::make_pair(low, high);
}

inline RdKafka::ErrorCode stop() const {
return consumer_->stop(topic_ptr_, partition_);
}

inline RdKafka::ErrorCode seek(int64_t offset) const {
return consumer_->seek(topic_ptr_, partition_, offset, 5000);
}

inline RdKafka::ErrorCode start(int64_t new_offset) const {
return consumer_->start(topic_ptr_, partition_, new_offset);
}

inline bool topic_exist() {
RdKafka::Metadata* metadata = nullptr;
RdKafka::ErrorCode metadata_err =
consumer_->metadata(false, topic_ptr_, &metadata, 5000);
delete metadata;
if (metadata_err != RdKafka::ERR_NO_ERROR) {
std::cout << "Failed to get metadata: " << RdKafka::err2str(metadata_err)
<< std::endl;
return false;
} else {
return true;
}
}

~KafkaConsumer() = default;

private:
const std::string brokers_;
const std::string topic_;
const std::string group_id_;
const int partition_;
const int64_t start_offset_;

RdKafka::Topic* topic_ptr_;
std::unique_ptr<RdKafka::Consumer> consumer_;
Expand Down
4 changes: 3 additions & 1 deletion converter/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ LogEntry LogEntry::bulk_load_end() {
return entry;
}

string LogEntry::to_string() const {
string LogEntry::to_string(int64_t binlog_offset) const {
string base;
if (op_type == OpType::BULKLOAD_END) {
base = "bulkload_end";
append_str(base, epoch);
append_str(base, binlog_offset);
return base;
}

Expand Down Expand Up @@ -133,6 +134,7 @@ string LogEntry::to_string() const {
append_str(base, str);
}

append_str(base, binlog_offset);
return base;
}

Expand Down
2 changes: 1 addition & 1 deletion converter/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class LogEntry {

static LogEntry bulk_load_end();

std::string to_string() const;
std::string to_string(int64_t binlog_offset) const;

int get_tx_id() const { return tx_id; }

Expand Down
2 changes: 2 additions & 0 deletions vegito/src/framework/bench_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,8 @@ void Runner::apply_log_to_store_(const string_view& log, int p_id) {
}

sv_vec.erase(sv_vec.begin(), sv_vec.begin() + 1);
// remove the last element of sv_vec, since the last element of sv_vec is offset of binlog
sv_vec.pop_back();

if (op == "add_vertex") {
process_add_vertex(sv_vec, graph_stores_[p_id]);
Expand Down

0 comments on commit 273a512

Please sign in to comment.