diff --git a/src/connection.c b/src/connection.c index 67d93a6..6100988 100644 --- a/src/connection.c +++ b/src/connection.c @@ -8,6 +8,7 @@ KafkaFdwGetConnection(KafkaOptions *k_options, rd_kafka_topic_conf_t *topic_conf = NULL; rd_kafka_conf_t * conf; char errstr[KAFKA_MAX_ERR_MSG]; + ListCell *option; /* brokers and topic should be validated just double check */ @@ -20,6 +21,15 @@ KafkaFdwGetConnection(KafkaOptions *k_options, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) elog(ERROR, "%s\n", errstr); + foreach (option, k_options->options) + { + DefElem *def = (DefElem *) lfirst(option); + + if (rd_kafka_conf_set(conf, def->defname + 1, defGetString(def), + errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + elog(ERROR, "%s\n", errstr); + } + *kafka_handle = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, KAFKA_MAX_ERR_MSG); if (*kafka_handle != NULL) diff --git a/src/kafka_fdw.h b/src/kafka_fdw.h index fee9c35..b6f5d6d 100644 --- a/src/kafka_fdw.h +++ b/src/kafka_fdw.h @@ -150,6 +150,7 @@ typedef struct KafkaOptions bool strict; /* force strict parsing */ bool ignore_junk; /* ignore junk data by setting it to null */ int num_parse_col; /* number of parsable columns */ + List *options; } KafkaOptions; typedef struct ParseOptions diff --git a/src/option.c b/src/option.c index 84a7acf..984125c 100644 --- a/src/option.c +++ b/src/option.c @@ -325,6 +325,9 @@ is_valid_option(const char *option, Oid context) { const struct KafkaFdwOption *opt; + if (option[0] == '#') + return true; + for (opt = valid_options; opt->optname; opt++) { if (context == opt->optcontext && strcmp(opt->optname, option) == 0) @@ -353,7 +356,12 @@ KafkaProcessKafkaOptions(Oid relid, KafkaOptions *kafka_options, List *options) { DefElem *def = (DefElem *) lfirst(option); - if (strcmp(def->defname, "topic") == 0) + if (def->defname[0] == '#') + { + kafka_options->options = lappend(kafka_options->options, def); + } + + else if (strcmp(def->defname, "topic") == 0) { if (kafka_options->topic) ereport(ERROR,