diff --git a/kafka_fdw--0.0.2--0.0.3.sql b/kafka_fdw--0.0.2--0.0.3.sql new file mode 100644 index 0000000..0d4927e --- /dev/null +++ b/kafka_fdw--0.0.2--0.0.3.sql @@ -0,0 +1,12 @@ +DROP FUNCTION kafka_get_watermarks(IN regclass, OUT int, OUT int, OUT int); + +CREATE FUNCTION kafka_get_watermarks( + IN rel regclass, + OUT partition int, + OUT offset_low bigint, + OUT offset_high bigint) +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'kafka_get_watermarks' +LANGUAGE C STRICT; + +ALTER TABLE kafka_fdw_offset_dump ALTER COLUMN "offset" TYPE bigint; diff --git a/kafka_fdw.control b/kafka_fdw.control index e130b1e..b2e3732 100644 --- a/kafka_fdw.control +++ b/kafka_fdw.control @@ -1,5 +1,5 @@ # kafka FDW comment = 'kafka Foreign Data Wrapper for CSV formated messages' -default_version = '0.0.2' +default_version = '0.0.3' module_pathname = '$libdir/kafka_fdw' relocatable = true diff --git a/sql/kafka_fdw.sql b/sql/kafka_fdw.sql index 6c05bdd..f9203bb 100644 --- a/sql/kafka_fdw.sql +++ b/sql/kafka_fdw.sql @@ -1,7 +1,7 @@ CREATE TABLE kafka_fdw_offset_dump( tbloid oid, partition int, - "offset" int, + "offset" bigint, last_fetch timestamp DEFAULT statement_timestamp(), PRIMARY KEY(tbloid, partition) ); @@ -23,18 +23,8 @@ CREATE FOREIGN DATA WRAPPER kafka_fdw CREATE FUNCTION kafka_get_watermarks(IN rel regclass, OUT partition int, - OUT offset_low int, - OUT offset_high int) + OUT offset_low bigint, + OUT offset_high bigint) RETURNS SETOF record AS 'MODULE_PATHNAME', 'kafka_get_watermarks' LANGUAGE C STRICT; - -DO $$ -DECLARE version_num INTEGER; -BEGIN - SELECT current_setting('server_version_num') INTO STRICT version_num; - IF version_num > 90600 THEN - EXECUTE 'ALTER FUNCTION kafka_get_watermarks(regclass) PARALLEL SAFE'; - END IF; -END -$$;