Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Protobuf format Schema Registry #729

Open
wants to merge 16 commits into
base: feature-protobuf-catalog
Choose a base branch
from
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ dependencies {

compileOnly group: 'org.apache.flink', name: 'flink-json', version: flinkVersion
compileOnly group: 'org.apache.flink', name: 'flink-avro', version: flinkVersion
compileOnly group: 'org.apache.flink', name: 'flink-protobuf', version: flinkVersion

testImplementation (group: 'io.pravega', name: 'pravega-standalone', version: pravegaVersion) {
exclude group: 'org.slf4j', module: 'slf4j-api'
Expand All @@ -146,6 +147,7 @@ dependencies {
testImplementation group: 'org.apache.flink', name: 'flink-table-planner_' + flinkScalaVersion, classifier: 'tests', version: flinkVersion
testImplementation group: 'org.apache.flink', name: 'flink-json', version: flinkVersion
testImplementation group: 'org.apache.flink', name: 'flink-avro', version: flinkVersion
testImplementation group: 'org.apache.flink', name: 'flink-protobuf', version: flinkVersion
testImplementation group: 'org.hamcrest', name: 'hamcrest', version: hamcrestVersion
testImplementation group: 'org.testcontainers', name: 'testcontainers', version: testcontainersVersion
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter', version: junit5Version
Expand Down
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@
<suppress checks="JavadocMethod" files=".+(Test|Tests|ITCase)\.java" />
<suppress checks="IllegalThrows" files=".+(Test|Tests|ITCase)\.java" />
<suppress files="[\\/]generated[\\/]" checks="[a-zA-Z0-9]*"/>
<suppress files="src[\\/]test[\\/]java[\\/]io[\\/]pravega[\\/]connectors[\\/]flink[\\/]formats[\\/]registry[\\/]testProto[\\/].*\.java" checks=".*" />
</suppressions>

Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
import java.util.Set;

/**
* Table format factory for providing configured instances of Pravega-Registry to Flink RowData {@link
* Table format factory for providing configured instances of Pravega-Registry
* to Flink RowData {@link
* SerializationSchema} and {@link DeserializationSchema}.
*/
public class PravegaRegistryFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {
Expand All @@ -63,23 +64,32 @@ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(

final boolean failOnMissingField = formatOptions.get(PravegaRegistryOptions.FAIL_ON_MISSING_FIELD);
final boolean ignoreParseErrors = formatOptions.get(PravegaRegistryOptions.IGNORE_PARSE_ERRORS);

final boolean pbIgnoreParseErrors = formatOptions.get(PravegaRegistryOptions.PB_IGNORE_PARSE_ERRORS);
final String pbMessageClassName = formatOptions.get(PravegaRegistryOptions.PB_MESSAGE_CLASS_NAME);
final boolean pbReadDefaultValues = formatOptions.get(PravegaRegistryOptions.PB_READ_DEFAULT_VALUES);
final String pbWriteNullStringLiterals = formatOptions.get(PravegaRegistryOptions.PB_WRITE_NULL_STRING_LITERAL);

TimestampFormat timestampOption = JsonFormatOptionsUtil.getTimestampFormat(formatOptions);

return new DecodingFormat<DeserializationSchema<RowData>>() {
@Override
public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context, DataType producedDatatype) {
final RowType rowType = (RowType) producedDatatype.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
context.createTypeInformation(producedDatatype);
final TypeInformation<RowData> rowDataTypeInfo = context.createTypeInformation(producedDatatype);
return new PravegaRegistryRowDataDeserializationSchema(
rowType,
rowDataTypeInfo,
groupId,
pravegaConfig,
failOnMissingField,
ignoreParseErrors,
timestampOption);
timestampOption,
pbMessageClassName,
pbIgnoreParseErrors,
pbReadDefaultValues,
pbWriteNullStringLiterals);
}

@Override
Expand All @@ -102,10 +112,15 @@ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
.withSchemaRegistryURI(URI.create(formatOptions.get(PravegaRegistryOptions.URI)));

TimestampFormat timestampOption = JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
final JsonFormatOptions.MapNullKeyMode mapNullKeyMode =
JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions);
final JsonFormatOptions.MapNullKeyMode mapNullKeyMode = JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions);
final String mapNullKeyLiteral = formatOptions.get(PravegaRegistryOptions.MAP_NULL_KEY_LITERAL);
final boolean encodeDecimalAsPlainNumber = formatOptions.get(PravegaRegistryOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER);
final boolean encodeDecimalAsPlainNumber = formatOptions
.get(PravegaRegistryOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER);

final boolean pbIgnoreParseErrors = formatOptions.get(PravegaRegistryOptions.PB_IGNORE_PARSE_ERRORS);
final String pbMessageClassName = formatOptions.get(PravegaRegistryOptions.PB_MESSAGE_CLASS_NAME);
final boolean pbReadDefaultValues = formatOptions.get(PravegaRegistryOptions.PB_READ_DEFAULT_VALUES);
final String pbWriteNullStringLiterals = formatOptions.get(PravegaRegistryOptions.PB_WRITE_NULL_STRING_LITERAL);

return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
Expand All @@ -120,7 +135,11 @@ public SerializationSchema<RowData> createRuntimeEncoder(
timestampOption,
mapNullKeyMode,
mapNullKeyLiteral,
encodeDecimalAsPlainNumber);
encodeDecimalAsPlainNumber,
pbMessageClassName,
pbIgnoreParseErrors,
pbReadDefaultValues,
pbWriteNullStringLiterals);
}

@Override
Expand Down Expand Up @@ -154,6 +173,12 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(PravegaRegistryOptions.MAP_NULL_KEY_MODE);
options.add(PravegaRegistryOptions.MAP_NULL_KEY_LITERAL);
options.add(PravegaRegistryOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER);

options.add(PravegaRegistryOptions.PB_MESSAGE_CLASS_NAME);
options.add(PravegaRegistryOptions.PB_IGNORE_PARSE_ERRORS);
options.add(PravegaRegistryOptions.PB_READ_DEFAULT_VALUES);
options.add(PravegaRegistryOptions.PB_WRITE_NULL_STRING_LITERAL);

options.add(PravegaRegistryOptions.SECURITY_AUTH_TYPE);
options.add(PravegaRegistryOptions.SECURITY_AUTH_TOKEN);
options.add(PravegaRegistryOptions.SECURITY_VALIDATE_HOSTNAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,47 +20,60 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.protobuf.PbFormatOptions;

public class PravegaRegistryOptions {

public static final ConfigOption<String> URI = ConfigOptions
.key("uri")
.stringType()
.noDefaultValue()
.withDescription("Required URI of Pravega schema registry");
public static final ConfigOption<String> URI = ConfigOptions
.key("uri")
.stringType()
.noDefaultValue()
.withDescription("Required URI of Pravega schema registry");

public static final ConfigOption<String> NAMESPACE = ConfigOptions
.key("namespace")
.stringType()
.noDefaultValue()
.withDescription("Required Pravega schema registry's namespace, should be the same as the Pravega scope name");
public static final ConfigOption<String> NAMESPACE = ConfigOptions
.key("namespace")
.stringType()
.noDefaultValue()
.withDescription(
"Required Pravega schema registry's namespace, should be the same as the Pravega scope name");

public static final ConfigOption<String> GROUP_ID = ConfigOptions
.key("group-id")
.stringType()
.noDefaultValue()
.withDescription("Required Pravega schema registry's groupID, should be the same as the Pravega stream name");
public static final ConfigOption<String> GROUP_ID = ConfigOptions
.key("group-id")
.stringType()
.noDefaultValue()
.withDescription(
"Required Pravega schema registry's groupID, should be the same as the Pravega stream name");

public static final ConfigOption<String> FORMAT = ConfigOptions
.key("format")
.stringType()
.defaultValue("Avro")
.withDescription("Optional serialization format for Pravega catalog. Valid enumerations are ['Avro'(default), 'Json']");
public static final ConfigOption<String> FORMAT = ConfigOptions
.key("format")
.stringType()
.defaultValue("Avro")
.withDescription(
"Optional serialization format for Pravega catalog. Valid enumerations are ['Avro'(default), 'Json']");

// Pravega security options
public static final ConfigOption<String> SECURITY_AUTH_TYPE = PravegaOptions.SECURITY_AUTH_TYPE;
public static final ConfigOption<String> SECURITY_AUTH_TOKEN = PravegaOptions.SECURITY_AUTH_TOKEN;
public static final ConfigOption<Boolean> SECURITY_VALIDATE_HOSTNAME = PravegaOptions.SECURITY_VALIDATE_HOSTNAME;
public static final ConfigOption<String> SECURITY_TRUST_STORE = PravegaOptions.SECURITY_TRUST_STORE;
// Pravega security options
public static final ConfigOption<String> SECURITY_AUTH_TYPE = PravegaOptions.SECURITY_AUTH_TYPE;
public static final ConfigOption<String> SECURITY_AUTH_TOKEN = PravegaOptions.SECURITY_AUTH_TOKEN;
public static final ConfigOption<Boolean> SECURITY_VALIDATE_HOSTNAME = PravegaOptions.SECURITY_VALIDATE_HOSTNAME;
public static final ConfigOption<String> SECURITY_TRUST_STORE = PravegaOptions.SECURITY_TRUST_STORE;

// --------------------------------------------------------------------------------------------
// Json Options
// --------------------------------------------------------------------------------------------
// --------------------------------------------------------------------------------------------
// Json Options
// --------------------------------------------------------------------------------------------

public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = JsonFormatOptions.FAIL_ON_MISSING_FIELD;
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = JsonFormatOptions.IGNORE_PARSE_ERRORS;
public static final ConfigOption<String> TIMESTAMP_FORMAT = JsonFormatOptions.TIMESTAMP_FORMAT;
public static final ConfigOption<String> MAP_NULL_KEY_MODE = JsonFormatOptions.MAP_NULL_KEY_MODE;
public static final ConfigOption<String> MAP_NULL_KEY_LITERAL = JsonFormatOptions.MAP_NULL_KEY_LITERAL;
public static final ConfigOption<Boolean> ENCODE_DECIMAL_AS_PLAIN_NUMBER = JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
public static final ConfigOption<Boolean> FAIL_ON_MISSING_FIELD = JsonFormatOptions.FAIL_ON_MISSING_FIELD;
public static final ConfigOption<Boolean> IGNORE_PARSE_ERRORS = JsonFormatOptions.IGNORE_PARSE_ERRORS;
public static final ConfigOption<String> TIMESTAMP_FORMAT = JsonFormatOptions.TIMESTAMP_FORMAT;
public static final ConfigOption<String> MAP_NULL_KEY_MODE = JsonFormatOptions.MAP_NULL_KEY_MODE;
public static final ConfigOption<String> MAP_NULL_KEY_LITERAL = JsonFormatOptions.MAP_NULL_KEY_LITERAL;
public static final ConfigOption<Boolean> ENCODE_DECIMAL_AS_PLAIN_NUMBER = JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;

// --------------------------------------------------------------------------------------------
// Protobuf Options
// --------------------------------------------------------------------------------------------

public static final ConfigOption<String> PB_MESSAGE_CLASS_NAME = PbFormatOptions.MESSAGE_CLASS_NAME;
public static final ConfigOption<Boolean> PB_IGNORE_PARSE_ERRORS = PbFormatOptions.IGNORE_PARSE_ERRORS;
public static final ConfigOption<Boolean> PB_READ_DEFAULT_VALUES = PbFormatOptions.READ_DEFAULT_VALUES;
public static final ConfigOption<String> PB_WRITE_NULL_STRING_LITERAL = PbFormatOptions.WRITE_NULL_STRING_LITERAL;
}
Loading