PB_WRITE_NULL_STRING_LITERAL = PbFormatOptions.WRITE_NULL_STRING_LITERAL;
}
diff --git a/src/main/java/io/pravega/connectors/flink/formats/registry/PravegaRegistryRowDataDeserializationSchema.java b/src/main/java/io/pravega/connectors/flink/formats/registry/PravegaRegistryRowDataDeserializationSchema.java
index c0bb4db0..472bd708 100644
--- a/src/main/java/io/pravega/connectors/flink/formats/registry/PravegaRegistryRowDataDeserializationSchema.java
+++ b/src/main/java/io/pravega/connectors/flink/formats/registry/PravegaRegistryRowDataDeserializationSchema.java
@@ -16,8 +16,10 @@
package io.pravega.connectors.flink.formats.registry;
+import com.google.protobuf.GeneratedMessageV3;
import io.pravega.client.stream.Serializer;
import io.pravega.connectors.flink.PravegaConfig;
+import io.pravega.connectors.flink.util.MessageToRowConverter;
import io.pravega.connectors.flink.util.SchemaRegistryUtils;
import io.pravega.schemaregistry.client.SchemaRegistryClient;
import io.pravega.schemaregistry.client.SchemaRegistryClientConfig;
@@ -25,6 +27,7 @@
import io.pravega.schemaregistry.contract.data.SchemaInfo;
import io.pravega.schemaregistry.contract.data.SerializationFormat;
import io.pravega.schemaregistry.serializer.avro.schemas.AvroSchema;
+import io.pravega.schemaregistry.serializer.protobuf.schemas.ProtobufSchema;
import io.pravega.schemaregistry.serializer.shared.impl.AbstractDeserializer;
import io.pravega.schemaregistry.serializer.shared.impl.EncodingCache;
import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig;
@@ -36,6 +39,8 @@
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonToRowDataConverters;
+import org.apache.flink.formats.protobuf.PbFormatConfig;
+import org.apache.flink.formats.protobuf.PbFormatConfig.PbFormatConfigBuilder;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -51,12 +56,17 @@
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
- * Deserialization schema from Pravega Schema Registry to Flink Table/SQL internal data structure {@link RowData}.
+ * Deserialization schema from Pravega Schema Registry to Flink Table/SQL
+ * internal data structure {@link RowData}.
*
- * Deserializes a byte[]
message as a Pravega Schema Registry and reads the specified fields.
+ *
+ * Deserializes a byte[]
message as a Pravega Schema Registry and
+ * reads the specified fields.
*
- *
Failures during deserialization are forwarded as wrapped IOExceptions.
+ *
+ * Failures during deserialization are forwarded as wrapped IOExceptions.
*/
public class PravegaRegistryRowDataDeserializationSchema implements DeserializationSchema {
private static final long serialVersionUID = 1L;
@@ -103,12 +113,27 @@ public class PravegaRegistryRowDataDeserializationSchema implements Deserializat
/** Flag indicating whether to fail if a field is missing. */
private final boolean failOnMissingField;
- /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
+ /**
+ * Flag indicating whether to ignore invalid fields/rows (default: throw an
+ * exception).
+ */
private final boolean ignoreParseErrors;
/** Timestamp format specification which is used to parse timestamp. */
private final TimestampFormat timestampFormat;
+ // --------------------------------------------------------------------------------------------
+ // Protobuf fields
+ // --------------------------------------------------------------------------------------------
+
+ // private final boolean pbIgnoreParseErrors;
+ private transient MessageToRowConverter messageToRowConverter;
+
+ private final String pbMessageClassName;
+ private final boolean pbIgnoreParseErrors;
+ private final boolean pbReadDefaultValues;
+ private final String pbWriteNullStringLiterals;
+
public PravegaRegistryRowDataDeserializationSchema(
RowType rowType,
TypeInformation typeInfo,
@@ -116,8 +141,11 @@ public PravegaRegistryRowDataDeserializationSchema(
PravegaConfig pravegaConfig,
boolean failOnMissingField,
boolean ignoreParseErrors,
- TimestampFormat timestampFormat
- ) {
+ TimestampFormat timestampFormat,
+ String pbMessageClassName,
+ boolean pbIgnoreParseErrors,
+ boolean pbReadDefaultValues,
+ String pbWriteNullStringLiterals) {
if (ignoreParseErrors && failOnMissingField) {
throw new IllegalArgumentException(
"JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled.");
@@ -130,13 +158,17 @@ public PravegaRegistryRowDataDeserializationSchema(
this.failOnMissingField = failOnMissingField;
this.ignoreParseErrors = ignoreParseErrors;
this.timestampFormat = timestampFormat;
+ this.pbMessageClassName = pbMessageClassName;
+ this.pbIgnoreParseErrors = pbIgnoreParseErrors;
+ this.pbReadDefaultValues = pbReadDefaultValues;
+ this.pbWriteNullStringLiterals = pbWriteNullStringLiterals;
}
@SuppressWarnings("unchecked")
@Override
public void open(InitializationContext context) throws Exception {
- SchemaRegistryClientConfig schemaRegistryClientConfig =
- SchemaRegistryUtils.getSchemaRegistryClientConfig(pravegaConfig);
+ SchemaRegistryClientConfig schemaRegistryClientConfig = SchemaRegistryUtils
+ .getSchemaRegistryClientConfig(pravegaConfig);
SchemaRegistryClient schemaRegistryClient = SchemaRegistryClientFactory.withNamespace(namespace,
schemaRegistryClientConfig);
SerializerConfig config = SerializerConfig.builder()
@@ -153,8 +185,7 @@ public void open(InitializationContext context) throws Exception {
break;
case Json:
ObjectMapper objectMapper = new ObjectMapper();
- boolean hasDecimalType =
- LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType);
+ boolean hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType);
if (hasDecimalType) {
objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
}
@@ -166,6 +197,11 @@ public void open(InitializationContext context) throws Exception {
config.isWriteEncodingHeader(),
objectMapper);
break;
+ case Protobuf:
+ ProtobufSchema protobufSchema = ProtobufSchema
+ .of((Class) Class.forName(pbMessageClassName));
+ deserializer = SerializerFactory.protobufDeserializer(config, protobufSchema);
+ break;
default:
throw new NotImplementedException("Not supporting deserialization format");
}
@@ -190,20 +226,30 @@ public Object deserializeToObject(byte[] message) {
return deserializer.deserialize(ByteBuffer.wrap(message));
}
- public RowData convertToRowData(Object message) {
+ public RowData convertToRowData(Object message) throws Exception {
Object o;
switch (serializationFormat) {
case Avro:
- AvroToRowDataConverters.AvroToRowDataConverter avroConverter =
- AvroToRowDataConverters.createRowConverter(rowType);
+ AvroToRowDataConverters.AvroToRowDataConverter avroConverter = AvroToRowDataConverters
+ .createRowConverter(rowType);
o = avroConverter.convert(message);
break;
case Json:
- JsonToRowDataConverters.JsonToRowDataConverter jsonConverter =
- new JsonToRowDataConverters(failOnMissingField, ignoreParseErrors, timestampFormat)
- .createConverter(checkNotNull(rowType));
+ JsonToRowDataConverters.JsonToRowDataConverter jsonConverter = new JsonToRowDataConverters(
+ failOnMissingField, ignoreParseErrors, timestampFormat)
+ .createConverter(checkNotNull(rowType));
o = jsonConverter.convert((JsonNode) message);
break;
+ case Protobuf:
+ PbFormatConfig pbFormatConfig = new PbFormatConfigBuilder()
+ .messageClassName(pbMessageClassName)
+ .ignoreParseErrors(pbIgnoreParseErrors)
+ .readDefaultValues(pbReadDefaultValues)
+ .writeNullStringLiterals(pbWriteNullStringLiterals)
+ .build();
+ messageToRowConverter = new MessageToRowConverter(rowType, pbFormatConfig);
+ o = messageToRowConverter.convertMessageToRow(message);
+ break;
default:
throw new NotImplementedException("Not supporting deserialization format");
}
@@ -214,16 +260,16 @@ private static class FlinkJsonGenericDeserializer extends AbstractDeserializerSerializes the input Flink object into GenericRecord and converts it into byte[]
.
+ *
+ * Serializes the input Flink object into GenericRecord and converts it into
+ * byte[]
.
*
- *
Result byte[]
messages can be deserialized using {@link
+ *
+ * Result byte[]
messages can be deserialized using {@link
* PravegaRegistryRowDataDeserializationSchema}.
*/
public class PravegaRegistryRowDataSerializationSchema implements SerializationSchema {
@@ -113,6 +124,16 @@ public class PravegaRegistryRowDataSerializationSchema implements SerializationS
/** Flag indicating whether to serialize all decimals as plain numbers. */
private final boolean encodeDecimalAsPlainNumber;
+ // --------------------------------------------------------------------------------------------
+ // Protobuf fields
+ // --------------------------------------------------------------------------------------------
+ // private final boolean pbIgnoreParseErrors;
+
+ private final String pbMessageClassName;
+ private final boolean pbIgnoreParseErrors;
+ private final boolean pbReadDefaultValues;
+ private final String pbWriteNullStringLiterals;
+
public PravegaRegistryRowDataSerializationSchema(
RowType rowType,
String groupId,
@@ -121,7 +142,11 @@ public PravegaRegistryRowDataSerializationSchema(
TimestampFormat timestampOption,
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
- boolean encodeDecimalAsPlainNumber) {
+ boolean encodeDecimalAsPlainNumber,
+ String pbMessageClassName,
+ boolean pbIgnoreParseErrors,
+ boolean pbReadDefaultValues,
+ String pbWriteNullStringLiterals) {
this.rowType = rowType;
this.serializer = null;
this.namespace = pravegaConfig.getDefaultScope();
@@ -132,13 +157,17 @@ public PravegaRegistryRowDataSerializationSchema(
this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral;
this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
+ this.pbMessageClassName = pbMessageClassName;
+ this.pbIgnoreParseErrors = pbIgnoreParseErrors;
+ this.pbReadDefaultValues = pbReadDefaultValues;
+ this.pbWriteNullStringLiterals = pbWriteNullStringLiterals;
}
@SuppressWarnings("unchecked")
@Override
public void open(InitializationContext context) throws Exception {
- SchemaRegistryClientConfig schemaRegistryClientConfig =
- SchemaRegistryUtils.getSchemaRegistryClientConfig(pravegaConfig);
+ SchemaRegistryClientConfig schemaRegistryClientConfig = SchemaRegistryUtils
+ .getSchemaRegistryClientConfig(pravegaConfig);
SchemaRegistryClient schemaRegistryClient = SchemaRegistryClientFactory.withNamespace(namespace,
schemaRegistryClientConfig);
SerializerConfig config = SerializerConfig.builder()
@@ -162,6 +191,11 @@ public void open(InitializationContext context) throws Exception {
config.isRegisterSchema(),
config.isWriteEncodingHeader());
break;
+ case Protobuf:
+ ProtobufSchema protobufSchema = ProtobufSchema
+ .of((Class) Class.forName(pbMessageClassName));
+ serializer = SerializerFactory.protobufSerializer(config, protobufSchema);
+ break;
default:
throw new NotImplementedException("Not supporting deserialization format");
}
@@ -176,6 +210,8 @@ public byte[] serialize(RowData row) {
return convertToByteArray(serializeToGenericRecord(row));
case Json:
return convertToByteArray(serializaToJsonNode(row));
+ case Protobuf:
+ return convertToByteArray(serializeToMessage(row));
default:
throw new NotImplementedException("Not supporting deserialization format");
}
@@ -185,8 +221,8 @@ public byte[] serialize(RowData row) {
}
public GenericRecord serializeToGenericRecord(RowData row) {
- RowDataToAvroConverters.RowDataToAvroConverter runtimeConverter =
- RowDataToAvroConverters.createConverter(rowType);
+ RowDataToAvroConverters.RowDataToAvroConverter runtimeConverter = RowDataToAvroConverters
+ .createConverter(rowType);
return (GenericRecord) runtimeConverter.convert(avroSchema, row);
}
@@ -200,22 +236,36 @@ public JsonNode serializaToJsonNode(RowData row) {
return runtimeConverter.convert(mapper, node, row);
}
+ public AbstractMessage serializeToMessage(RowData row) throws Exception {
+ PbFormatConfig pbConfig = new PbFormatConfigBuilder()
+ .ignoreParseErrors(pbIgnoreParseErrors)
+ .readDefaultValues(pbReadDefaultValues)
+ .writeNullStringLiterals(pbWriteNullStringLiterals)
+ .messageClassName(pbMessageClassName)
+ .build();
+ RowToMessageConverter runtimeConverter = new RowToMessageConverter(rowType, pbConfig);
+ return runtimeConverter.convertRowToProtoMessage(row);
+ }
+
@SuppressWarnings("unchecked")
public byte[] convertToByteArray(Object message) {
- return serializer.serialize(message).array();
+ return FlinkPravegaUtils
+ .byteBufferToArray(serializer.serialize(message));
}
@VisibleForTesting
protected static class FlinkJsonSerializer extends AbstractSerializer {
private final ObjectMapper objectMapper;
+
public FlinkJsonSerializer(String groupId, SchemaRegistryClient client, JSONSchema schema,
- Encoder encoder, boolean registerSchema, boolean encodeHeader) {
+ Encoder encoder, boolean registerSchema, boolean encodeHeader) {
super(groupId, client, schema, encoder, registerSchema, encodeHeader);
objectMapper = new ObjectMapper();
}
@Override
- protected void serialize(JsonNode jsonNode, SchemaInfo schemaInfo, OutputStream outputStream) throws IOException {
+ protected void serialize(JsonNode jsonNode, SchemaInfo schemaInfo, OutputStream outputStream)
+ throws IOException {
objectMapper.writeValue(outputStream, jsonNode);
outputStream.flush();
}
diff --git a/src/main/java/io/pravega/connectors/flink/util/MessageToRowConverter.java b/src/main/java/io/pravega/connectors/flink/util/MessageToRowConverter.java
new file mode 100644
index 00000000..1bd4d0d3
--- /dev/null
+++ b/src/main/java/io/pravega/connectors/flink/util/MessageToRowConverter.java
@@ -0,0 +1,123 @@
+/**
+ * Copyright Pravega Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pravega.connectors.flink.util;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FileDescriptor.Syntax;
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbConstant;
+import org.apache.flink.formats.protobuf.PbFormatConfig;
+import org.apache.flink.formats.protobuf.PbFormatContext;
+import org.apache.flink.formats.protobuf.deserialize.PbCodegenDeserializeFactory;
+import org.apache.flink.formats.protobuf.deserialize.PbCodegenDeserializer;
+import org.apache.flink.formats.protobuf.util.PbCodegenAppender;
+import org.apache.flink.formats.protobuf.util.PbCodegenUtils;
+import org.apache.flink.formats.protobuf.util.PbFormatUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * {@link MessageToRowConverter} can convert protobuf message data to flink row
+ * data by codegen
+ * process.
+ */
+public class MessageToRowConverter {
+ private static final Logger LOG = LoggerFactory.getLogger(MessageToRowConverter.class);
+ private final Method decodeMethod;
+
+ public MessageToRowConverter(RowType rowType, PbFormatConfig formatConfig)
+ throws PbCodegenException {
+ try {
+ Descriptors.Descriptor descriptor = PbFormatUtils.getDescriptor(formatConfig.getMessageClassName());
+ Class> messageClass = Class.forName(
+ formatConfig.getMessageClassName(),
+ true,
+ Thread.currentThread().getContextClassLoader());
+ String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);
+ if (descriptor.getFile().getSyntax() == Syntax.PROTO3) {
+ // pb3 always read default values
+ formatConfig = new PbFormatConfig(
+ formatConfig.getMessageClassName(),
+ formatConfig.isIgnoreParseErrors(),
+ true,
+ formatConfig.getWriteNullStringLiterals());
+ }
+ PbCodegenAppender codegenAppender = new PbCodegenAppender();
+ PbFormatContext pbFormatContext = new PbFormatContext(formatConfig);
+ String uuid = UUID.randomUUID().toString().replaceAll("\\-", "");
+ String generatedClassName = "GeneratedProtoToRow_" + uuid;
+ String generatedPackageName = MessageToRowConverter.class.getPackage().getName();
+ codegenAppender.appendLine("package " + generatedPackageName);
+ codegenAppender.appendLine("import " + RowData.class.getName());
+ codegenAppender.appendLine("import " + ArrayData.class.getName());
+ codegenAppender.appendLine("import " + BinaryStringData.class.getName());
+ codegenAppender.appendLine("import " + GenericRowData.class.getName());
+ codegenAppender.appendLine("import " + GenericMapData.class.getName());
+ codegenAppender.appendLine("import " + GenericArrayData.class.getName());
+ codegenAppender.appendLine("import " + ArrayList.class.getName());
+ codegenAppender.appendLine("import " + List.class.getName());
+ codegenAppender.appendLine("import " + Map.class.getName());
+ codegenAppender.appendLine("import " + HashMap.class.getName());
+ codegenAppender.appendLine("import " + ByteString.class.getName());
+
+ codegenAppender.appendSegment("public class " + generatedClassName + "{");
+ codegenAppender.appendSegment(
+ "public static RowData "
+ + PbConstant.GENERATED_DECODE_METHOD
+ + "("
+ + fullMessageClassName
+ + " message){");
+ codegenAppender.appendLine("RowData rowData=null");
+ PbCodegenDeserializer codegenDes = PbCodegenDeserializeFactory.getPbCodegenTopRowDes(
+ descriptor, rowType, pbFormatContext);
+ String genCode = codegenDes.codegen("rowData", "message", 0);
+ codegenAppender.appendSegment(genCode);
+ codegenAppender.appendLine("return rowData");
+ codegenAppender.appendSegment("}");
+ codegenAppender.appendSegment("}");
+
+ String printCode = codegenAppender.printWithLineNumber();
+ LOG.debug("Protobuf decode codegen: \n" + printCode);
+ Class generatedClass = PbCodegenUtils.compileClass(
+ Thread.currentThread().getContextClassLoader(),
+ generatedPackageName + "." + generatedClassName,
+ codegenAppender.code());
+ decodeMethod = generatedClass.getMethod(PbConstant.GENERATED_DECODE_METHOD, messageClass);
+ } catch (Exception ex) {
+ throw new PbCodegenException(ex);
+ }
+ }
+
+ public RowData convertMessageToRow(Object messageObj) throws Exception {
+ return (RowData) decodeMethod.invoke(null, messageObj);
+ }
+}
diff --git a/src/main/java/io/pravega/connectors/flink/util/RowToMessageConverter.java b/src/main/java/io/pravega/connectors/flink/util/RowToMessageConverter.java
new file mode 100644
index 00000000..7826769d
--- /dev/null
+++ b/src/main/java/io/pravega/connectors/flink/util/RowToMessageConverter.java
@@ -0,0 +1,108 @@
+/**
+ * Copyright Pravega Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pravega.connectors.flink.util;
+
+import com.google.protobuf.AbstractMessage;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbConstant;
+import org.apache.flink.formats.protobuf.PbFormatConfig;
+import org.apache.flink.formats.protobuf.PbFormatContext;
+import org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter;
+import org.apache.flink.formats.protobuf.serialize.PbCodegenSerializeFactory;
+import org.apache.flink.formats.protobuf.serialize.PbCodegenSerializer;
+import org.apache.flink.formats.protobuf.util.PbCodegenAppender;
+import org.apache.flink.formats.protobuf.util.PbCodegenUtils;
+import org.apache.flink.formats.protobuf.util.PbFormatUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * {@link RowToMessageConverter} can convert flink row data to binary protobuf
+ * message data by codegen
+ * process.
+ */
+public class RowToMessageConverter {
+ private static final Logger LOG = LoggerFactory.getLogger(ProtoToRowConverter.class);
+ private final Method encodeMethod;
+
+ public RowToMessageConverter(RowType rowType, PbFormatConfig formatConfig)
+ throws PbCodegenException {
+ try {
+ Descriptors.Descriptor descriptor = PbFormatUtils
+ .getDescriptor(formatConfig.getMessageClassName());
+ PbFormatContext formatContext = new PbFormatContext(formatConfig);
+
+ PbCodegenAppender codegenAppender = new PbCodegenAppender(0);
+ String uuid = UUID.randomUUID().toString().replaceAll("\\-", "");
+ String generatedClassName = "GeneratedRowToProto_" + uuid;
+ String generatedPackageName = RowToMessageConverter.class.getPackage().getName();
+ codegenAppender.appendLine("package " + generatedPackageName);
+ codegenAppender.appendLine("import " + AbstractMessage.class.getName());
+ codegenAppender.appendLine("import " + Descriptors.class.getName());
+ codegenAppender.appendLine("import " + RowData.class.getName());
+ codegenAppender.appendLine("import " + ArrayData.class.getName());
+ codegenAppender.appendLine("import " + StringData.class.getName());
+ codegenAppender.appendLine("import " + ByteString.class.getName());
+ codegenAppender.appendLine("import " + List.class.getName());
+ codegenAppender.appendLine("import " + ArrayList.class.getName());
+ codegenAppender.appendLine("import " + Map.class.getName());
+ codegenAppender.appendLine("import " + HashMap.class.getName());
+
+ codegenAppender.begin("public class " + generatedClassName + "{");
+ codegenAppender.begin(
+ "public static AbstractMessage "
+ + PbConstant.GENERATED_ENCODE_METHOD
+ + "(RowData rowData){");
+ codegenAppender.appendLine("AbstractMessage message = null");
+ PbCodegenSerializer codegenSer = PbCodegenSerializeFactory.getPbCodegenTopRowSer(
+ descriptor, rowType, formatContext);
+ String genCode = codegenSer.codegen("message", "rowData", codegenAppender.currentIndent());
+ codegenAppender.appendSegment(genCode);
+ codegenAppender.appendLine("return message");
+ codegenAppender.end("}");
+ codegenAppender.end("}");
+
+ String printCode = codegenAppender.printWithLineNumber();
+ LOG.debug("Protobuf encode codegen: \n" + printCode);
+ Class generatedClass = PbCodegenUtils.compileClass(
+ Thread.currentThread().getContextClassLoader(),
+ generatedPackageName + "." + generatedClassName,
+ codegenAppender.code());
+ encodeMethod = generatedClass.getMethod(PbConstant.GENERATED_ENCODE_METHOD, RowData.class);
+ } catch (Exception ex) {
+ throw new PbCodegenException(ex);
+ }
+ }
+
+ public AbstractMessage convertRowToProtoMessage(RowData rowData) throws Exception {
+ AbstractMessage message = (AbstractMessage) encodeMethod.invoke(null, rowData);
+ return message;
+ }
+}
diff --git a/src/test/java/io/pravega/connectors/flink/formats/registry/PravegaRegistryFormatFactoryTest.java b/src/test/java/io/pravega/connectors/flink/formats/registry/PravegaRegistryFormatFactoryTest.java
index c11829ec..c10b8660 100644
--- a/src/test/java/io/pravega/connectors/flink/formats/registry/PravegaRegistryFormatFactoryTest.java
+++ b/src/test/java/io/pravega/connectors/flink/formats/registry/PravegaRegistryFormatFactoryTest.java
@@ -53,127 +53,128 @@
/** Tests for the {@link PravegaRegistryFormatFactory}. */
public class PravegaRegistryFormatFactoryTest extends TestLogger {
- private static final ResolvedSchema RESOLVED_SCHEMA =
- ResolvedSchema.of(
- Column.physical("a", DataTypes.STRING()),
- Column.physical("b", DataTypes.INT()),
- Column.physical("c", DataTypes.BOOLEAN()));
-
- private static final RowType ROW_TYPE = (RowType) RESOLVED_SCHEMA.toPhysicalRowDataType().getLogicalType();
-
- private static final String SCOPE = "test-scope";
- private static final String STREAM = "test-stream";
- private static final String TOKEN = RandomStringUtils.randomAlphabetic(10);
- private static final String TRUST_STORE = RandomStringUtils.randomAlphabetic(10);
- private static final PravegaConfig PRAVEGA_CONFIG = PravegaConfig.fromDefaults().
- withSchemaRegistryURI(URI.create("http://localhost:10092")).
- withDefaultScope(SCOPE).
- withCredentials(new FlinkPravegaUtils.SimpleCredentials("Basic", TOKEN)).
- withHostnameValidation(false).
- withTrustStore(TRUST_STORE);
-
- private static final SerializationFormat SERIALIZATIONFORMAT = SerializationFormat.Avro;
- private static final boolean FAIL_ON_MISSING_FIELD = false;
- private static final boolean IGNORE_PARSE_ERRORS = false;
- private static final TimestampFormat TIMESTAMP_FORMAT = TimestampFormat.SQL;
- private static final JsonFormatOptions.MapNullKeyMode MAP_NULL_KEY_MODE =
- JsonFormatOptions.MapNullKeyMode.FAIL;
- private static final String MAP_NULL_KEY_LITERAL = "null";
- private static final boolean ENCODE_DECIMAL_AS_PLAIN_NUMBER = false;
-
- @Test
- public void testSeDeSchema() {
- final PravegaRegistryRowDataDeserializationSchema expectedDeser =
- new PravegaRegistryRowDataDeserializationSchema(
- ROW_TYPE,
- InternalTypeInfo.of(ROW_TYPE),
- STREAM,
- PRAVEGA_CONFIG,
- FAIL_ON_MISSING_FIELD,
- IGNORE_PARSE_ERRORS,
- TIMESTAMP_FORMAT);
-
- final Map options = getAllOptions();
-
- final DynamicTableSource actualSource = createTableSource(options);
- assertThat(actualSource instanceof TestDynamicTableFactory.DynamicTableSourceMock).isTrue();
- TestDynamicTableFactory.DynamicTableSourceMock sourceMock =
- (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
-
- DeserializationSchema actualDeser =
- sourceMock.valueFormat.createRuntimeDecoder(
- ScanRuntimeProviderContext.INSTANCE, RESOLVED_SCHEMA.toPhysicalRowDataType());
-
- assertThat(actualDeser).isEqualTo(expectedDeser);
-
- final PravegaRegistryRowDataSerializationSchema expectedSer =
- new PravegaRegistryRowDataSerializationSchema(
- ROW_TYPE,
- STREAM,
- SERIALIZATIONFORMAT,
- PRAVEGA_CONFIG,
- TIMESTAMP_FORMAT,
- MAP_NULL_KEY_MODE,
- MAP_NULL_KEY_LITERAL,
- ENCODE_DECIMAL_AS_PLAIN_NUMBER);
-
- final DynamicTableSink actualSink = createTableSink(options);
- assertThat(actualSink instanceof TestDynamicTableFactory.DynamicTableSinkMock).isTrue();
- TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
- (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
-
- SerializationSchema actualSer =
- sinkMock.valueFormat.createRuntimeEncoder(null, RESOLVED_SCHEMA.toPhysicalRowDataType());
-
- assertThat(actualSer).isEqualTo(expectedSer);
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private Map getAllOptions() {
- final Map options = new HashMap<>();
- options.put("connector", "test-connector");
- options.put("target", "MyTarget");
-
- options.put("format", PravegaRegistryFormatFactory.IDENTIFIER);
- options.put("pravega-registry.uri", "http://localhost:10092");
- options.put("pravega-registry.namespace", SCOPE);
- options.put("pravega-registry.group-id", STREAM);
- options.put("pravega-registry.format", SERIALIZATIONFORMAT.name());
- options.put("pravega-registry.fail-on-missing-field", "false");
- options.put("pravega-registry.ignore-parse-errors", "false");
- options.put("pravega-registry.timestamp-format.standard", "SQL");
- options.put("pravega-registry.map-null-key.mode", "FAIL");
- options.put("pravega-registry.map-null-key.literal", "null");
-
- options.put("pravega-registry.security.auth-type", "Basic");
- options.put("pravega-registry.security.auth-token", TOKEN);
- options.put("pravega-registry.security.validate-hostname", "false");
- options.put("pravega-registry.security.trust-store", TRUST_STORE);
- return options;
- }
-
- private static DynamicTableSource createTableSource(Map options) {
- CatalogTable table = new CatalogTableImpl(TableSchema.fromResolvedSchema(RESOLVED_SCHEMA), options, "scanTable");
- return FactoryUtil.createTableSource(
- null,
- ObjectIdentifier.of("default", "default", "scanTable"),
- new ResolvedCatalogTable(table, RESOLVED_SCHEMA),
- new Configuration(),
- Thread.currentThread().getContextClassLoader(),
- false);
- }
-
- private static DynamicTableSink createTableSink(Map options) {
- CatalogTable table = new CatalogTableImpl(TableSchema.fromResolvedSchema(RESOLVED_SCHEMA), options, "scanTable");
- return FactoryUtil.createTableSink(
- null,
- ObjectIdentifier.of("default", "default", "scanTable"),
- new ResolvedCatalogTable(table, RESOLVED_SCHEMA),
- new Configuration(),
- Thread.currentThread().getContextClassLoader(),
- false);
- }
+ private static final ResolvedSchema RESOLVED_SCHEMA = ResolvedSchema.of(
+ Column.physical("a", DataTypes.STRING()),
+ Column.physical("b", DataTypes.INT()),
+ Column.physical("c", DataTypes.BOOLEAN()));
+
+ private static final RowType ROW_TYPE = (RowType) RESOLVED_SCHEMA.toPhysicalRowDataType().getLogicalType();
+
+ private static final String SCOPE = "test-scope";
+ private static final String STREAM = "test-stream";
+ private static final String TOKEN = RandomStringUtils.randomAlphabetic(10);
+ private static final String TRUST_STORE = RandomStringUtils.randomAlphabetic(10);
+ private static final PravegaConfig PRAVEGA_CONFIG = PravegaConfig.fromDefaults()
+ .withSchemaRegistryURI(URI.create("http://localhost:10092")).withDefaultScope(SCOPE)
+ .withCredentials(new FlinkPravegaUtils.SimpleCredentials("Basic", TOKEN))
+ .withHostnameValidation(false).withTrustStore(TRUST_STORE);
+
+ private static final SerializationFormat SERIALIZATIONFORMAT = SerializationFormat.Avro;
+ private static final boolean FAIL_ON_MISSING_FIELD = false;
+ private static final boolean IGNORE_PARSE_ERRORS = false;
+ private static final TimestampFormat TIMESTAMP_FORMAT = TimestampFormat.SQL;
+ private static final JsonFormatOptions.MapNullKeyMode MAP_NULL_KEY_MODE = JsonFormatOptions.MapNullKeyMode.FAIL;
+ private static final String MAP_NULL_KEY_LITERAL = "null";
+ private static final boolean ENCODE_DECIMAL_AS_PLAIN_NUMBER = false;
+
+ private static final String PB_MESSAGE_CLASS_NAME = "test.pb.Message";
+ private static final Boolean PB_IGNORE_PARSE_ERRORS = false;
+ private static final Boolean PB_READ_DEFAULT_VALUES = false;
+ private static final String PB_WRITE_NULL_STRING_LITERAL = "null";
+
+ @Test
+ public void testSeDeSchema() {
+ final PravegaRegistryRowDataDeserializationSchema expectedDeser = new PravegaRegistryRowDataDeserializationSchema(
+ ROW_TYPE,
+ InternalTypeInfo.of(ROW_TYPE),
+ STREAM,
+ PRAVEGA_CONFIG,
+ FAIL_ON_MISSING_FIELD,
+ IGNORE_PARSE_ERRORS,
+ TIMESTAMP_FORMAT,
+ PB_MESSAGE_CLASS_NAME, PB_IGNORE_PARSE_ERRORS, PB_READ_DEFAULT_VALUES,
+ PB_WRITE_NULL_STRING_LITERAL);
+
+ final Map options = getAllOptions();
+
+ final DynamicTableSource actualSource = createTableSource(options);
+ assertThat(actualSource instanceof TestDynamicTableFactory.DynamicTableSourceMock).isTrue();
+ TestDynamicTableFactory.DynamicTableSourceMock sourceMock = (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+
+ DeserializationSchema actualDeser = sourceMock.valueFormat.createRuntimeDecoder(
+ ScanRuntimeProviderContext.INSTANCE, RESOLVED_SCHEMA.toPhysicalRowDataType());
+
+ assertThat(actualDeser).isEqualTo(expectedDeser);
+
+ final PravegaRegistryRowDataSerializationSchema expectedSer = new PravegaRegistryRowDataSerializationSchema(
+ ROW_TYPE,
+ STREAM,
+ SERIALIZATIONFORMAT,
+ PRAVEGA_CONFIG,
+ TIMESTAMP_FORMAT,
+ MAP_NULL_KEY_MODE,
+ MAP_NULL_KEY_LITERAL,
+ ENCODE_DECIMAL_AS_PLAIN_NUMBER, PB_MESSAGE_CLASS_NAME, PB_IGNORE_PARSE_ERRORS,
+ PB_READ_DEFAULT_VALUES, PB_WRITE_NULL_STRING_LITERAL);
+
+ final DynamicTableSink actualSink = createTableSink(options);
+ assertThat(actualSink instanceof TestDynamicTableFactory.DynamicTableSinkMock).isTrue();
+ TestDynamicTableFactory.DynamicTableSinkMock sinkMock = (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
+
+ SerializationSchema actualSer = sinkMock.valueFormat.createRuntimeEncoder(null,
+ RESOLVED_SCHEMA.toPhysicalRowDataType());
+
+ assertThat(actualSer).isEqualTo(expectedSer);
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private Map getAllOptions() {
+ final Map options = new HashMap<>();
+ options.put("connector", "test-connector");
+ options.put("target", "MyTarget");
+
+ options.put("format", PravegaRegistryFormatFactory.IDENTIFIER);
+ options.put("pravega-registry.uri", "http://localhost:10092");
+ options.put("pravega-registry.namespace", SCOPE);
+ options.put("pravega-registry.group-id", STREAM);
+ options.put("pravega-registry.format", SERIALIZATIONFORMAT.name());
+ options.put("pravega-registry.fail-on-missing-field", "false");
+ options.put("pravega-registry.ignore-parse-errors", "false");
+ options.put("pravega-registry.timestamp-format.standard", "SQL");
+ options.put("pravega-registry.map-null-key.mode", "FAIL");
+ options.put("pravega-registry.map-null-key.literal", "null");
+
+ options.put("pravega-registry.security.auth-type", "Basic");
+ options.put("pravega-registry.security.auth-token", TOKEN);
+ options.put("pravega-registry.security.validate-hostname", "false");
+ options.put("pravega-registry.security.trust-store", TRUST_STORE);
+ return options;
+ }
+
+ private static DynamicTableSource createTableSource(Map options) {
+ CatalogTable table = new CatalogTableImpl(TableSchema.fromResolvedSchema(RESOLVED_SCHEMA), options,
+ "scanTable");
+ return FactoryUtil.createTableSource(
+ null,
+ ObjectIdentifier.of("default", "default", "scanTable"),
+ new ResolvedCatalogTable(table, RESOLVED_SCHEMA),
+ new Configuration(),
+ Thread.currentThread().getContextClassLoader(),
+ false);
+ }
+
+ private static DynamicTableSink createTableSink(Map options) {
+ CatalogTable table = new CatalogTableImpl(TableSchema.fromResolvedSchema(RESOLVED_SCHEMA), options,
+ "scanTable");
+ return FactoryUtil.createTableSink(
+ null,
+ ObjectIdentifier.of("default", "default", "scanTable"),
+ new ResolvedCatalogTable(table, RESOLVED_SCHEMA),
+ new Configuration(),
+ Thread.currentThread().getContextClassLoader(),
+ false);
+ }
}
diff --git a/src/test/java/io/pravega/connectors/flink/formats/registry/PravegaRegistrySeDeITCase.java b/src/test/java/io/pravega/connectors/flink/formats/registry/PravegaRegistrySeDeITCase.java
index 21918f8f..fd004deb 100644
--- a/src/test/java/io/pravega/connectors/flink/formats/registry/PravegaRegistrySeDeITCase.java
+++ b/src/test/java/io/pravega/connectors/flink/formats/registry/PravegaRegistrySeDeITCase.java
@@ -17,8 +17,10 @@
package io.pravega.connectors.flink.formats.registry;
import io.pravega.client.stream.Serializer;
+import io.pravega.connectors.flink.formats.registry.testProto.testMessage;
import io.pravega.connectors.flink.table.catalog.pravega.PravegaCatalog;
import io.pravega.connectors.flink.table.catalog.pravega.util.PravegaSchemaUtils;
+import io.pravega.connectors.flink.util.FlinkPravegaUtils;
import io.pravega.connectors.flink.utils.SchemaRegistryTestEnvironment;
import io.pravega.connectors.flink.utils.runtime.PravegaRuntime;
import io.pravega.connectors.flink.utils.runtime.SchemaRegistryRuntime;
@@ -27,6 +29,7 @@
import io.pravega.schemaregistry.contract.data.SerializationFormat;
import io.pravega.schemaregistry.serializer.avro.schemas.AvroSchema;
import io.pravega.schemaregistry.serializer.json.schemas.JSONSchema;
+import io.pravega.schemaregistry.serializer.protobuf.schemas.ProtobufSchema;
import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig;
import io.pravega.schemaregistry.serializers.SerializerFactory;
import org.apache.avro.Schema;
@@ -86,344 +89,461 @@
import static org.apache.flink.table.api.DataTypes.TINYINT;
import static org.assertj.core.api.Assertions.assertThat;
-/** Intergration Test for Pravega Registry serialization and deserialization schema. */
+/**
+ * Intergration Test for Pravega Registry serialization and deserialization
+ * schema.
+ */
@SuppressWarnings("checkstyle:StaticVariableName")
public class PravegaRegistrySeDeITCase {
- private static final String TEST_AVRO_CATALOG_NAME = "mycatalog1";
- private static final String TEST_JSON_CATALOG_NAME = "mycatalog2";
-
- /** Avro fields */
- private static final String AVRO_TEST_STREAM = "stream1";
- private static Schema avroSchema = null;
- private static RowType avroRowType = null;
- private static TypeInformation avroTypeInfo = null;
-
- /** Json fields */
- private static final String JSON_TEST_STREAM = "stream2";
- private static JSONSchema jsonSchema = null;
- private static RowType jsonRowType = null;
- private static TypeInformation jsonTypeInfo = null;
- private static DataType jsonDataType = null;
-
- private static final boolean FAIL_ON_MISSING_FIELD = false;
- private static final boolean IGNORE_PARSE_ERRORS = false;
- private static final TimestampFormat TIMESTAMP_FORMAT = TimestampFormat.ISO_8601;
- private static final JsonFormatOptions.MapNullKeyMode MAP_NULL_KEY_MODE =
- JsonFormatOptions.MapNullKeyMode.FAIL;
- private static final String MAP_NULL_KEY_LITERAL = "null";
- private static final boolean ENCODE_DECIMAL_AS_PLAIN_NUMBER = false;
-
- /** Setup utility */
- private static final SchemaRegistryTestEnvironment SCHEMA_REGISTRY =
- new SchemaRegistryTestEnvironment(PravegaRuntime.container(), SchemaRegistryRuntime.container());
-
- @BeforeAll
- public static void setupPravega() throws Exception {
- SCHEMA_REGISTRY.startUp();
- }
-
- @AfterAll
- public static void tearDownPravega() throws Exception {
- SCHEMA_REGISTRY.tearDown();
- }
-
- @Test
- public void testAvroSerializeDeserialize() throws Exception {
- Map properties = new HashMap<>();
- properties.put("connector", "pravega");
- properties.put("controller-uri", SCHEMA_REGISTRY.operator().getControllerUri().toString());
- properties.put("format", "pravega-registry");
- properties.put("pravega-registry.uri",
- SCHEMA_REGISTRY.schemaRegistryOperator().getSchemaRegistryUri().toString());
- properties.put("pravega-registry.format", "Avro");
- final PravegaCatalog avroCatalog = new PravegaCatalog(TEST_AVRO_CATALOG_NAME, SCHEMA_REGISTRY.operator().getScope(), properties,
- SCHEMA_REGISTRY.operator().getPravegaConfig()
- .withDefaultScope(SCHEMA_REGISTRY.operator().getScope())
- .withSchemaRegistryURI(SCHEMA_REGISTRY.schemaRegistryOperator().getSchemaRegistryUri()),
- "Avro");
- initAvro();
- avroCatalog.open();
-
- final GenericRecord record = new GenericData.Record(avroSchema);
- record.put(0, true);
- record.put(1, (int) Byte.MAX_VALUE);
- record.put(2, (int) Short.MAX_VALUE);
- record.put(3, 33);
- record.put(4, 44L);
- record.put(5, 12.34F);
- record.put(6, 23.45);
- record.put(7, "hello avro");
- record.put(8, ByteBuffer.wrap(new byte[] {1, 2, 4, 5, 6, 7, 8, 12}));
-
- record.put(
- 9, ByteBuffer.wrap(BigDecimal.valueOf(123456789, 6).unscaledValue().toByteArray()));
-
- List doubles = new ArrayList<>();
- doubles.add(1.2);
- doubles.add(3.4);
- doubles.add(567.8901);
- record.put(10, doubles);
-
- record.put(11, 18397);
- record.put(12, 10087);
- record.put(13, 1589530213123L);
- record.put(14, 1589530213122L);
-
- Map map = new HashMap<>();
- map.put("flink", 12L);
- map.put("avro", 23L);
- record.put(15, map);
-
- Map> map2map = new HashMap<>();
- Map innerMap = new HashMap<>();
- innerMap.put("inner_key1", 123);
- innerMap.put("inner_key2", 234);
- map2map.put("outer_key", innerMap);
- record.put(16, map2map);
-
- List list1 = Arrays.asList(1, 2, 3, 4, 5, 6);
- List list2 = Arrays.asList(11, 22, 33, 44, 55);
- Map> map2list = new HashMap<>();
- map2list.put("list1", list1);
- map2list.put("list2", list2);
- record.put(17, map2list);
-
- Map map2 = new HashMap<>();
- map2.put("key1", null);
- record.put(18, map2);
-
- PravegaRegistryRowDataSerializationSchema serializationSchema =
- new PravegaRegistryRowDataSerializationSchema(avroRowType,
- AVRO_TEST_STREAM, SerializationFormat.Avro,
- SCHEMA_REGISTRY.operator().getPravegaConfig().withDefaultScope(SCHEMA_REGISTRY.operator().getScope())
- .withSchemaRegistryURI(SCHEMA_REGISTRY.schemaRegistryOperator().getSchemaRegistryUri()),
- TIMESTAMP_FORMAT, MAP_NULL_KEY_MODE, MAP_NULL_KEY_LITERAL, ENCODE_DECIMAL_AS_PLAIN_NUMBER);
- serializationSchema.open(null);
- PravegaRegistryRowDataDeserializationSchema deserializationSchema =
- new PravegaRegistryRowDataDeserializationSchema(avroRowType, avroTypeInfo,
- AVRO_TEST_STREAM,
- SCHEMA_REGISTRY.operator().getPravegaConfig().withDefaultScope(SCHEMA_REGISTRY.operator().getScope())
- .withSchemaRegistryURI(SCHEMA_REGISTRY.schemaRegistryOperator().getSchemaRegistryUri()),
- FAIL_ON_MISSING_FIELD, IGNORE_PARSE_ERRORS, TIMESTAMP_FORMAT);
- deserializationSchema.open(null);
-
- SchemaRegistryClientConfig schemaRegistryClientConfig = SchemaRegistryClientConfig.builder()
- .schemaRegistryUri(SCHEMA_REGISTRY.schemaRegistryOperator().getSchemaRegistryUri())
- .build();
- SerializerConfig config = SerializerConfig.builder()
- .registryConfig(schemaRegistryClientConfig)
- .namespace(SCHEMA_REGISTRY.operator().getScope())
- .groupId(AVRO_TEST_STREAM)
- .build();
- Serializer serializer = SerializerFactory.avroSerializer(config, AvroSchema.ofRecord(avroSchema));
-
- byte[] input = serializer.serialize(record).array();
- RowData rowData = deserializationSchema.deserialize(input);
- byte[] output = serializationSchema.serialize(rowData);
-
- assertThat(output).isEqualTo(input);
-
- avroCatalog.close();
- }
-
- @Test
- public void testJsonDeserialize() throws Exception {
- Map properties = new HashMap<>();
- properties.put("connector", "pravega");
- properties.put("controller-uri", SCHEMA_REGISTRY.operator().getControllerUri().toString());
- properties.put("format", "pravega-registry");
- properties.put("pravega-registry.uri",
- SCHEMA_REGISTRY.schemaRegistryOperator().getSchemaRegistryUri().toString());
- properties.put("pravega-registry.format", "Json");
- final PravegaCatalog jsonCatalog = new PravegaCatalog(TEST_JSON_CATALOG_NAME, SCHEMA_REGISTRY.operator().getScope(), properties,
- SCHEMA_REGISTRY.operator().getPravegaConfig()
- .withDefaultScope(SCHEMA_REGISTRY.operator().getScope())
- .withSchemaRegistryURI(SCHEMA_REGISTRY.schemaRegistryOperator().getSchemaRegistryUri()),
- "Json");
- initJson();
- jsonCatalog.open();
-
- byte tinyint = 'c';
- short smallint = 128;
- int intValue = 45536;
- float floatValue = 33.333F;
- long bigint = 1238123899121L;
- String name = "asdlkjasjkdla998y1122";
- byte[] bytes = new byte[1024];
- ThreadLocalRandom.current().nextBytes(bytes);
- BigDecimal decimal = new BigDecimal("123.456789");
- Double[] doubles = new Double[] {1.1, 2.2, 3.3};
- LocalDate date = LocalDate.parse("1990-10-14");
- LocalTime time = LocalTime.parse("12:12:43");
- Timestamp timestamp3 = Timestamp.valueOf("1990-10-14 12:12:43.123");
- Timestamp timestamp9 = Timestamp.valueOf("1990-10-14 12:12:43.123456789");
- Instant timestampWithLocalZone =
- LocalDateTime.of(1990, 10, 14, 12, 12, 43, 123456789)
- .atOffset(ZoneOffset.of("Z"))
- .toInstant();
-
- Map map = new HashMap<>();
- map.put("flink", 123L);
-
- Map multiSet = new HashMap<>();
- multiSet.put("blink", 2);
-
- Map> nestedMap = new HashMap<>();
- Map innerMap = new HashMap<>();
- innerMap.put("key", 234);
- nestedMap.put("inner_map", innerMap);
-
- ObjectMapper objectMapper = new ObjectMapper();
- ArrayNode doubleNode = objectMapper.createArrayNode().add(1.1D).add(2.2D).add(3.3D);
-
- // Root
- ObjectNode root = objectMapper.createObjectNode();
- root.put("bool", true);
- root.put("tinyint", tinyint);
- root.put("smallint", smallint);
- root.put("int", intValue);
- root.put("bigint", bigint);
- root.put("float", floatValue);
- root.put("name", name);
- root.put("bytes", bytes);
- root.put("decimal", decimal);
- root.set("doubles", doubleNode);
- root.put("date", "1990-10-14");
- root.put("time", "12:12:43");
- root.put("timestamp3", "1990-10-14T12:12:43.123");
- root.put("timestamp9", "1990-10-14T12:12:43.123456789");
- root.put("timestampWithLocalZone", "1990-10-14T12:12:43.123456789Z");
- root.putObject("map").put("flink", 123);
- root.putObject("multiSet").put("blink", 2);
- root.putObject("map2map").putObject("inner_map").put("key", 234);
-
- SchemaRegistryClientConfig schemaRegistryClientConfig = SchemaRegistryClientConfig.builder()
- .schemaRegistryUri(SCHEMA_REGISTRY.schemaRegistryOperator().getSchemaRegistryUri())
- .build();
- SerializerConfig serializerConfig = SerializerConfig.builder()
- .registryConfig(schemaRegistryClientConfig)
- .namespace(SCHEMA_REGISTRY.operator().getScope())
- .groupId(JSON_TEST_STREAM)
- .build();
- Serializer serializer = new PravegaRegistryRowDataSerializationSchema.FlinkJsonSerializer(
- JSON_TEST_STREAM,
- SchemaRegistryClientFactory.withNamespace(SCHEMA_REGISTRY.operator().getScope(), schemaRegistryClientConfig),
- jsonSchema,
- serializerConfig.getEncoder(),
- serializerConfig.isRegisterSchema(),
- serializerConfig.isWriteEncodingHeader());
-
- byte[] serializedJson = serializer.serialize(root).array();
-
- // test deserialization
- PravegaRegistryRowDataDeserializationSchema deserializationSchema =
- new PravegaRegistryRowDataDeserializationSchema(
- jsonRowType, jsonTypeInfo, JSON_TEST_STREAM,
- SCHEMA_REGISTRY.operator().getPravegaConfig().withDefaultScope(SCHEMA_REGISTRY.operator().getScope())
- .withSchemaRegistryURI(SCHEMA_REGISTRY.schemaRegistryOperator().getSchemaRegistryUri()),
- FAIL_ON_MISSING_FIELD, IGNORE_PARSE_ERRORS, TIMESTAMP_FORMAT);
- deserializationSchema.open(null);
-
- Row expected = new Row(18);
- expected.setField(0, true);
- expected.setField(1, tinyint);
- expected.setField(2, smallint);
- expected.setField(3, intValue);
- expected.setField(4, bigint);
- expected.setField(5, floatValue);
- expected.setField(6, name);
- expected.setField(7, bytes);
- expected.setField(8, decimal);
- expected.setField(9, doubles);
- expected.setField(10, date);
- expected.setField(11, time);
- expected.setField(12, timestamp3.toLocalDateTime());
- expected.setField(13, timestamp9.toLocalDateTime());
- expected.setField(14, timestampWithLocalZone);
- expected.setField(15, map);
- expected.setField(16, multiSet);
- expected.setField(17, nestedMap);
-
- RowData rowData = deserializationSchema.deserialize(serializedJson);
- Row actual = convertToExternal(rowData, jsonDataType);
- assertThat(actual).isEqualTo(expected);
-
- // test serialization
- PravegaRegistryRowDataSerializationSchema serializationSchema =
- new PravegaRegistryRowDataSerializationSchema(
- jsonRowType, JSON_TEST_STREAM, SerializationFormat.Json,
- SCHEMA_REGISTRY.operator().getPravegaConfig().withDefaultScope(SCHEMA_REGISTRY.operator().getScope())
- .withSchemaRegistryURI(SCHEMA_REGISTRY.schemaRegistryOperator().getSchemaRegistryUri()),
- TIMESTAMP_FORMAT, MAP_NULL_KEY_MODE, MAP_NULL_KEY_LITERAL, ENCODE_DECIMAL_AS_PLAIN_NUMBER);
- serializationSchema.open(null);
-
- byte[] actualBytes = serializationSchema.serialize(rowData);
- assertThat(new String(actualBytes)).isEqualTo(new String(serializedJson));
-
- jsonCatalog.close();
- }
-
- private static void initAvro() throws Exception {
- final DataType dataType =
- ROW(
- FIELD("bool", BOOLEAN()),
- FIELD("tinyint", TINYINT()),
- FIELD("smallint", SMALLINT()),
- FIELD("int", INT()),
- FIELD("bigint", BIGINT()),
- FIELD("float", FLOAT()),
- FIELD("double", DOUBLE()),
- FIELD("name", STRING()),
- FIELD("bytes", BYTES()),
- FIELD("decimal", DECIMAL(19, 6)),
- FIELD("doubles", ARRAY(DOUBLE())),
- FIELD("time", TIME(0)),
- FIELD("date", DATE()),
- FIELD("timestamp3", TIMESTAMP(3)),
- FIELD("timestamp3_2", TIMESTAMP(3)),
- FIELD("map", MAP(STRING(), BIGINT())),
- FIELD("map2map", MAP(STRING(), MAP(STRING(), INT()))),
- FIELD("map2array", MAP(STRING(), ARRAY(INT()))),
- FIELD("nullEntryMap", MAP(STRING(), STRING()))).notNull();
- avroRowType = (RowType) dataType.getLogicalType();
- avroTypeInfo = InternalTypeInfo.of(avroRowType);
- avroSchema = AvroSchemaConverter.convertToSchema(avroRowType);
- SCHEMA_REGISTRY.schemaRegistryOperator().registerSchema(AVRO_TEST_STREAM, AvroSchema.of(avroSchema), SerializationFormat.Avro);
- SCHEMA_REGISTRY.operator().createTestStream(AVRO_TEST_STREAM, 3);
- }
-
- private static void initJson() throws Exception {
- jsonDataType =
- ROW(
- FIELD("bool", BOOLEAN()),
- FIELD("tinyint", TINYINT()),
- FIELD("smallint", SMALLINT()),
- FIELD("int", INT()),
- FIELD("bigint", BIGINT()),
- FIELD("float", FLOAT()),
- FIELD("name", STRING()),
- FIELD("bytes", BYTES()),
- FIELD("decimal", DECIMAL(9, 6)),
- FIELD("doubles", ARRAY(DOUBLE())),
- FIELD("date", DATE()),
- FIELD("time", TIME(0)),
- FIELD("timestamp3", TIMESTAMP(3)),
- FIELD("timestamp9", TIMESTAMP(9)),
- FIELD("timestampWithLocalZone", TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)),
- FIELD("map", MAP(STRING(), BIGINT())),
- FIELD("multiSet", MULTISET(STRING())),
- FIELD("map2map", MAP(STRING(), MAP(STRING(), INT()))));
- jsonRowType = (RowType) jsonDataType.getLogicalType();
- jsonTypeInfo = InternalTypeInfo.of(jsonRowType);
-
- String schemaString = PravegaSchemaUtils.convertToJsonSchemaString(jsonRowType);
- jsonSchema = JSONSchema.of("", schemaString, JsonNode.class);
- SCHEMA_REGISTRY.schemaRegistryOperator().registerSchema(JSON_TEST_STREAM, jsonSchema, SerializationFormat.Json);
- SCHEMA_REGISTRY.operator().createTestStream(JSON_TEST_STREAM, 3);
- }
-
- @SuppressWarnings("unchecked")
- private static Row convertToExternal(RowData rowData, DataType dataType) {
- return (Row) DataFormatConverters.getConverterForDataType(dataType).toExternal(rowData);
- }
+ private static final String TEST_AVRO_CATALOG_NAME = "mycatalog1";
+ private static final String TEST_JSON_CATALOG_NAME = "mycatalog2";
+ private static final String TEST_PROTOBUF_CATALOG_NAME = "mycatalog3";
+
+ /** Avro fields */
+ private static final String AVRO_TEST_STREAM = "stream1";
+ private static Schema avroSchema = null;
+ private static RowType avroRowType = null;
+ private static TypeInformation avroTypeInfo = null;
+
+ /** Json fields */
+ private static final String JSON_TEST_STREAM = "stream2";
+ private static JSONSchema jsonSchema = null;
+ private static RowType jsonRowType = null;
+ private static TypeInformation jsonTypeInfo = null;
+ private static DataType jsonDataType = null;
+
+ private static final boolean FAIL_ON_MISSING_FIELD = false;
+ private static final boolean IGNORE_PARSE_ERRORS = false;
+ private static final TimestampFormat TIMESTAMP_FORMAT = TimestampFormat.ISO_8601;
+ private static final JsonFormatOptions.MapNullKeyMode MAP_NULL_KEY_MODE = JsonFormatOptions.MapNullKeyMode.FAIL;
+ private static final String MAP_NULL_KEY_LITERAL = "null";
+ private static final boolean ENCODE_DECIMAL_AS_PLAIN_NUMBER = false;
+
+ /** Protobuf fields */
+ private static final String PROTOBUF_TEST_STREAM = "stream3";
+ private static ProtobufSchema> protobufSchema = null;
+ private static RowType protobufRowType = null;
+ private static TypeInformation protobufTypeInfo = null;
+ private static DataType protobufDataType = null;
+
+ private static final boolean PB_IGNORE_PARSE_ERRORS = false;
+ private static final String PB_MESSAGE_CLASS_NAME = "io.pravega.connectors.flink.formats.registry.testProto.testMessage";
+ private static final boolean PB_READ_DEFAULT_VALUES = false;
+ private static final String PB_WRITE_NULL_STRING_LITERAL = "null";
+
+ /** Setup utility */
+ private static final SchemaRegistryTestEnvironment SCHEMA_REGISTRY = new SchemaRegistryTestEnvironment(
+ PravegaRuntime.container(), SchemaRegistryRuntime.container());
+
+ @BeforeAll
+ public static void setupPravega() throws Exception {
+ SCHEMA_REGISTRY.startUp();
+ }
+
+ @AfterAll
+ public static void tearDownPravega() throws Exception {
+ SCHEMA_REGISTRY.tearDown();
+ }
+
+ @Test
+ public void testAvroSerializeDeserialize() throws Exception {
+ Map properties = new HashMap<>();
+ properties.put("connector", "pravega");
+ properties.put("controller-uri", SCHEMA_REGISTRY.operator().getControllerUri().toString());
+ properties.put("format", "pravega-registry");
+ properties.put("pravega-registry.uri",
+ SCHEMA_REGISTRY.schemaRegistryOperator().getSchemaRegistryUri().toString());
+ properties.put("pravega-registry.format", "Avro");
+ final PravegaCatalog avroCatalog = new PravegaCatalog(TEST_AVRO_CATALOG_NAME,
+ SCHEMA_REGISTRY.operator().getScope(), properties,
+ SCHEMA_REGISTRY.operator().getPravegaConfig()
+ .withDefaultScope(SCHEMA_REGISTRY.operator().getScope())
+ .withSchemaRegistryURI(SCHEMA_REGISTRY.schemaRegistryOperator()
+ .getSchemaRegistryUri()),
+ "Avro");
+ initAvro();
+ avroCatalog.open();
+
+ final GenericRecord record = new GenericData.Record(avroSchema);
+ record.put(0, true);
+ record.put(1, (int) Byte.MAX_VALUE);
+ record.put(2, (int) Short.MAX_VALUE);
+ record.put(3, 33);
+ record.put(4, 44L);
+ record.put(5, 12.34F);
+ record.put(6, 23.45);
+ record.put(7, "hello avro");
+ record.put(8, ByteBuffer.wrap(new byte[] { 1, 2, 4, 5, 6, 7, 8, 12 }));
+
+ record.put(
+ 9, ByteBuffer.wrap(BigDecimal.valueOf(123456789, 6).unscaledValue().toByteArray()));
+
+ List doubles = new ArrayList<>();
+ doubles.add(1.2);
+ doubles.add(3.4);
+ doubles.add(567.8901);
+ record.put(10, doubles);
+
+ record.put(11, 18397);
+ record.put(12, 10087);
+ record.put(13, 1589530213123L);
+ record.put(14, 1589530213122L);
+
+ Map map = new HashMap<>();
+ map.put("flink", 12L);
+ map.put("avro", 23L);
+ record.put(15, map);
+
+ Map> map2map = new HashMap<>();
+ Map innerMap = new HashMap<>();
+ innerMap.put("inner_key1", 123);
+ innerMap.put("inner_key2", 234);
+ map2map.put("outer_key", innerMap);
+ record.put(16, map2map);
+
+ List list1 = Arrays.asList(1, 2, 3, 4, 5, 6);
+ List list2 = Arrays.asList(11, 22, 33, 44, 55);
+ Map> map2list = new HashMap<>();
+ map2list.put("list1", list1);
+ map2list.put("list2", list2);
+ record.put(17, map2list);
+
+ Map map2 = new HashMap<>();
+ map2.put("key1", null);
+ record.put(18, map2);
+
+ PravegaRegistryRowDataSerializationSchema serializationSchema = new PravegaRegistryRowDataSerializationSchema(
+ avroRowType,
+ AVRO_TEST_STREAM, SerializationFormat.Avro,
+ SCHEMA_REGISTRY.operator().getPravegaConfig()
+ .withDefaultScope(SCHEMA_REGISTRY.operator().getScope())
+ .withSchemaRegistryURI(SCHEMA_REGISTRY.schemaRegistryOperator()
+ .getSchemaRegistryUri()),
+ TIMESTAMP_FORMAT, MAP_NULL_KEY_MODE, MAP_NULL_KEY_LITERAL,
+ ENCODE_DECIMAL_AS_PLAIN_NUMBER, PB_MESSAGE_CLASS_NAME, PB_IGNORE_PARSE_ERRORS,
+ PB_READ_DEFAULT_VALUES, PB_WRITE_NULL_STRING_LITERAL);
+ serializationSchema.open(null);
+ PravegaRegistryRowDataDeserializationSchema deserializationSchema = new PravegaRegistryRowDataDeserializationSchema(
+ avroRowType, avroTypeInfo,
+ AVRO_TEST_STREAM,
+ SCHEMA_REGISTRY.operator().getPravegaConfig()
+ .withDefaultScope(SCHEMA_REGISTRY.operator().getScope())
+ .withSchemaRegistryURI(SCHEMA_REGISTRY.schemaRegistryOperator()
+ .getSchemaRegistryUri()),
+ FAIL_ON_MISSING_FIELD, IGNORE_PARSE_ERRORS, TIMESTAMP_FORMAT, PB_MESSAGE_CLASS_NAME,
+ PB_IGNORE_PARSE_ERRORS,
+ PB_READ_DEFAULT_VALUES, PB_WRITE_NULL_STRING_LITERAL);
+ deserializationSchema.open(null);
+
+ SchemaRegistryClientConfig schemaRegistryClientConfig = SchemaRegistryClientConfig.builder()
+ .schemaRegistryUri(SCHEMA_REGISTRY.schemaRegistryOperator().getSchemaRegistryUri())
+ .build();
+ SerializerConfig config = SerializerConfig.builder()
+ .registryConfig(schemaRegistryClientConfig)
+ .namespace(SCHEMA_REGISTRY.operator().getScope())
+ .groupId(AVRO_TEST_STREAM)
+ .build();
+ Serializer serializer = SerializerFactory.avroSerializer(config,
+ AvroSchema.ofRecord(avroSchema));
+
+ byte[] input = FlinkPravegaUtils
+ .byteBufferToArray(serializer.serialize(record));
+ RowData rowData = deserializationSchema.deserialize(input);
+ byte[] output = serializationSchema.serialize(rowData);
+
+ assertThat(output).isEqualTo(input);
+
+ avroCatalog.close();
+ }
+
+ @Test
+ public void testJsonDeserialize() throws Exception {
+ Map properties = new HashMap<>();
+ properties.put("connector", "pravega");
+ properties.put("controller-uri", SCHEMA_REGISTRY.operator().getControllerUri().toString());
+ properties.put("format", "pravega-registry");
+ properties.put("pravega-registry.uri",
+ SCHEMA_REGISTRY.schemaRegistryOperator().getSchemaRegistryUri().toString());
+ properties.put("pravega-registry.format", "Json");
+ final PravegaCatalog jsonCatalog = new PravegaCatalog(TEST_JSON_CATALOG_NAME,
+ SCHEMA_REGISTRY.operator().getScope(), properties,
+ SCHEMA_REGISTRY.operator().getPravegaConfig()
+ .withDefaultScope(SCHEMA_REGISTRY.operator().getScope())
+ .withSchemaRegistryURI(SCHEMA_REGISTRY.schemaRegistryOperator()
+ .getSchemaRegistryUri()),
+ "Json");
+ initJson();
+ jsonCatalog.open();
+
+ byte tinyint = 'c';
+ short smallint = 128;
+ int intValue = 45536;
+ float floatValue = 33.333F;
+ long bigint = 1238123899121L;
+ String name = "asdlkjasjkdla998y1122";
+ byte[] bytes = new byte[1024];
+ ThreadLocalRandom.current().nextBytes(bytes);
+ BigDecimal decimal = new BigDecimal("123.456789");
+ Double[] doubles = new Double[] { 1.1, 2.2, 3.3 };
+ LocalDate date = LocalDate.parse("1990-10-14");
+ LocalTime time = LocalTime.parse("12:12:43");
+ Timestamp timestamp3 = Timestamp.valueOf("1990-10-14 12:12:43.123");
+ Timestamp timestamp9 = Timestamp.valueOf("1990-10-14 12:12:43.123456789");
+ Instant timestampWithLocalZone = LocalDateTime.of(1990, 10, 14, 12, 12, 43, 123456789)
+ .atOffset(ZoneOffset.of("Z"))
+ .toInstant();
+
+ Map map = new HashMap<>();
+ map.put("flink", 123L);
+
+ Map multiSet = new HashMap<>();
+ multiSet.put("blink", 2);
+
+ Map> nestedMap = new HashMap<>();
+ Map innerMap = new HashMap<>();
+ innerMap.put("key", 234);
+ nestedMap.put("inner_map", innerMap);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ ArrayNode doubleNode = objectMapper.createArrayNode().add(1.1D).add(2.2D).add(3.3D);
+
+ // Root
+ ObjectNode root = objectMapper.createObjectNode();
+ root.put("bool", true);
+ root.put("tinyint", tinyint);
+ root.put("smallint", smallint);
+ root.put("int", intValue);
+ root.put("bigint", bigint);
+ root.put("float", floatValue);
+ root.put("name", name);
+ root.put("bytes", bytes);
+ root.put("decimal", decimal);
+ root.set("doubles", doubleNode);
+ root.put("date", "1990-10-14");
+ root.put("time", "12:12:43");
+ root.put("timestamp3", "1990-10-14T12:12:43.123");
+ root.put("timestamp9", "1990-10-14T12:12:43.123456789");
+ root.put("timestampWithLocalZone", "1990-10-14T12:12:43.123456789Z");
+ root.putObject("map").put("flink", 123);
+ root.putObject("multiSet").put("blink", 2);
+ root.putObject("map2map").putObject("inner_map").put("key", 234);
+
+ SchemaRegistryClientConfig schemaRegistryClientConfig = SchemaRegistryClientConfig.builder()
+ .schemaRegistryUri(SCHEMA_REGISTRY.schemaRegistryOperator().getSchemaRegistryUri())
+ .build();
+ SerializerConfig serializerConfig = SerializerConfig.builder()
+ .registryConfig(schemaRegistryClientConfig)
+ .namespace(SCHEMA_REGISTRY.operator().getScope())
+ .groupId(JSON_TEST_STREAM)
+ .build();
+ Serializer serializer = new PravegaRegistryRowDataSerializationSchema.FlinkJsonSerializer(
+ JSON_TEST_STREAM,
+ SchemaRegistryClientFactory.withNamespace(SCHEMA_REGISTRY.operator().getScope(),
+ schemaRegistryClientConfig),
+ jsonSchema,
+ serializerConfig.getEncoder(),
+ serializerConfig.isRegisterSchema(),
+ serializerConfig.isWriteEncodingHeader());
+
+ byte[] serializedJson = serializer.serialize(root).array();
+
+ // test deserialization
+ PravegaRegistryRowDataDeserializationSchema deserializationSchema = new PravegaRegistryRowDataDeserializationSchema(
+ jsonRowType, jsonTypeInfo, JSON_TEST_STREAM,
+ SCHEMA_REGISTRY.operator().getPravegaConfig()
+ .withDefaultScope(SCHEMA_REGISTRY.operator().getScope())
+ .withSchemaRegistryURI(SCHEMA_REGISTRY.schemaRegistryOperator()
+ .getSchemaRegistryUri()),
+ FAIL_ON_MISSING_FIELD, IGNORE_PARSE_ERRORS, TIMESTAMP_FORMAT, PB_MESSAGE_CLASS_NAME,
+ PB_IGNORE_PARSE_ERRORS, PB_READ_DEFAULT_VALUES, PB_WRITE_NULL_STRING_LITERAL);
+ deserializationSchema.open(null);
+
+ Row expected = new Row(18);
+ expected.setField(0, true);
+ expected.setField(1, tinyint);
+ expected.setField(2, smallint);
+ expected.setField(3, intValue);
+ expected.setField(4, bigint);
+ expected.setField(5, floatValue);
+ expected.setField(6, name);
+ expected.setField(7, bytes);
+ expected.setField(8, decimal);
+ expected.setField(9, doubles);
+ expected.setField(10, date);
+ expected.setField(11, time);
+ expected.setField(12, timestamp3.toLocalDateTime());
+ expected.setField(13, timestamp9.toLocalDateTime());
+ expected.setField(14, timestampWithLocalZone);
+ expected.setField(15, map);
+ expected.setField(16, multiSet);
+ expected.setField(17, nestedMap);
+
+ RowData rowData = deserializationSchema.deserialize(serializedJson);
+ Row actual = convertToExternal(rowData, jsonDataType);
+ assertThat(actual).isEqualTo(expected);
+
+ // test serialization
+ PravegaRegistryRowDataSerializationSchema serializationSchema = new PravegaRegistryRowDataSerializationSchema(
+ jsonRowType, JSON_TEST_STREAM, SerializationFormat.Json,
+ SCHEMA_REGISTRY.operator().getPravegaConfig()
+ .withDefaultScope(SCHEMA_REGISTRY.operator().getScope())
+ .withSchemaRegistryURI(SCHEMA_REGISTRY.schemaRegistryOperator()
+ .getSchemaRegistryUri()),
+ TIMESTAMP_FORMAT, MAP_NULL_KEY_MODE, MAP_NULL_KEY_LITERAL,
+ ENCODE_DECIMAL_AS_PLAIN_NUMBER, PB_MESSAGE_CLASS_NAME, PB_IGNORE_PARSE_ERRORS,
+ PB_READ_DEFAULT_VALUES, PB_WRITE_NULL_STRING_LITERAL);
+ serializationSchema.open(null);
+
+ byte[] actualBytes = serializationSchema.serialize(rowData);
+ assertThat(new String(actualBytes)).isEqualTo(new String(serializedJson));
+
+ jsonCatalog.close();
+ }
+
+ @Test
+ public void testProtobufDeserialize() throws Exception {
+
+ Map properties = new HashMap<>();
+ properties.put("connector", "pravega");
+ properties.put("controller-uri", SCHEMA_REGISTRY.operator().getControllerUri().toString());
+ properties.put("format", "pravega-registry");
+ properties.put("pravega-registry.uri",
+ SCHEMA_REGISTRY.schemaRegistryOperator().getSchemaRegistryUri().toString());
+ properties.put("pravega-registry.format", "Protobuf");
+ final PravegaCatalog protobufCatalog = new PravegaCatalog(TEST_PROTOBUF_CATALOG_NAME,
+ SCHEMA_REGISTRY.operator().getScope(), properties,
+ SCHEMA_REGISTRY.operator().getPravegaConfig()
+ .withDefaultScope(SCHEMA_REGISTRY.operator().getScope())
+ .withSchemaRegistryURI(SCHEMA_REGISTRY.schemaRegistryOperator()
+ .getSchemaRegistryUri()),
+ "Protobuf");
+ initProtobuf();
+ protobufCatalog.open();
+
+ SchemaRegistryClientConfig schemaRegistryClientConfig = SchemaRegistryClientConfig.builder()
+ .schemaRegistryUri(SCHEMA_REGISTRY.schemaRegistryOperator().getSchemaRegistryUri())
+ .build();
+
+ PravegaRegistryRowDataSerializationSchema serializationSchema = new PravegaRegistryRowDataSerializationSchema(
+ protobufRowType,
+ PROTOBUF_TEST_STREAM, SerializationFormat.Protobuf,
+ SCHEMA_REGISTRY.operator().getPravegaConfig()
+ .withDefaultScope(SCHEMA_REGISTRY.operator().getScope())
+ .withSchemaRegistryURI(SCHEMA_REGISTRY.schemaRegistryOperator()
+ .getSchemaRegistryUri()),
+ TIMESTAMP_FORMAT, MAP_NULL_KEY_MODE, MAP_NULL_KEY_LITERAL,
+ ENCODE_DECIMAL_AS_PLAIN_NUMBER, PB_MESSAGE_CLASS_NAME, PB_IGNORE_PARSE_ERRORS,
+ PB_READ_DEFAULT_VALUES, PB_WRITE_NULL_STRING_LITERAL);
+ serializationSchema.open(null);
+ PravegaRegistryRowDataDeserializationSchema deserializationSchema = new PravegaRegistryRowDataDeserializationSchema(
+ protobufRowType, protobufTypeInfo,
+ PROTOBUF_TEST_STREAM,
+ SCHEMA_REGISTRY.operator().getPravegaConfig()
+ .withDefaultScope(SCHEMA_REGISTRY.operator().getScope())
+ .withSchemaRegistryURI(SCHEMA_REGISTRY.schemaRegistryOperator()
+ .getSchemaRegistryUri()),
+ FAIL_ON_MISSING_FIELD, IGNORE_PARSE_ERRORS, TIMESTAMP_FORMAT, PB_MESSAGE_CLASS_NAME,
+ PB_IGNORE_PARSE_ERRORS, PB_READ_DEFAULT_VALUES, PB_WRITE_NULL_STRING_LITERAL);
+ deserializationSchema.open(null);
+
+ SerializerConfig config = SerializerConfig.builder()
+ .registryConfig(schemaRegistryClientConfig)
+ .namespace(SCHEMA_REGISTRY.operator().getScope())
+ .groupId(PROTOBUF_TEST_STREAM)
+ .build();
+
+ Serializer serializer = SerializerFactory.protobufSerializer(config,
+ ProtobufSchema.of(testMessage.class));
+ byte[] input = FlinkPravegaUtils
+ .byteBufferToArray(serializer.serialize(testMessage.newBuilder()
+ .setName("name")
+ .setField1(1)
+ .build()));
+ RowData rowData = deserializationSchema.deserialize(input);
+ byte[] output = serializationSchema.serialize(rowData);
+
+ assertThat(output).isEqualTo(input);
+
+ protobufCatalog.close();
+
+ }
+
+ private static void initAvro() throws Exception {
+ final DataType dataType = ROW(
+ FIELD("bool", BOOLEAN()),
+ FIELD("tinyint", TINYINT()),
+ FIELD("smallint", SMALLINT()),
+ FIELD("int", INT()),
+ FIELD("bigint", BIGINT()),
+ FIELD("float", FLOAT()),
+ FIELD("double", DOUBLE()),
+ FIELD("name", STRING()),
+ FIELD("bytes", BYTES()),
+ FIELD("decimal", DECIMAL(19, 6)),
+ FIELD("doubles", ARRAY(DOUBLE())),
+ FIELD("time", TIME(0)),
+ FIELD("date", DATE()),
+ FIELD("timestamp3", TIMESTAMP(3)),
+ FIELD("timestamp3_2", TIMESTAMP(3)),
+ FIELD("map", MAP(STRING(), BIGINT())),
+ FIELD("map2map", MAP(STRING(), MAP(STRING(), INT()))),
+ FIELD("map2array", MAP(STRING(), ARRAY(INT()))),
+ FIELD("nullEntryMap", MAP(STRING(), STRING()))).notNull();
+ avroRowType = (RowType) dataType.getLogicalType();
+ avroTypeInfo = InternalTypeInfo.of(avroRowType);
+ avroSchema = AvroSchemaConverter.convertToSchema(avroRowType);
+ SCHEMA_REGISTRY.schemaRegistryOperator().registerSchema(AVRO_TEST_STREAM, AvroSchema.of(avroSchema),
+ SerializationFormat.Avro);
+ SCHEMA_REGISTRY.operator().createTestStream(AVRO_TEST_STREAM, 3);
+ }
+
+ private static void initJson() throws Exception {
+ jsonDataType = ROW(
+ FIELD("bool", BOOLEAN()),
+ FIELD("tinyint", TINYINT()),
+ FIELD("smallint", SMALLINT()),
+ FIELD("int", INT()),
+ FIELD("bigint", BIGINT()),
+ FIELD("float", FLOAT()),
+ FIELD("name", STRING()),
+ FIELD("bytes", BYTES()),
+ FIELD("decimal", DECIMAL(9, 6)),
+ FIELD("doubles", ARRAY(DOUBLE())),
+ FIELD("date", DATE()),
+ FIELD("time", TIME(0)),
+ FIELD("timestamp3", TIMESTAMP(3)),
+ FIELD("timestamp9", TIMESTAMP(9)),
+ FIELD("timestampWithLocalZone", TIMESTAMP_WITH_LOCAL_TIME_ZONE(9)),
+ FIELD("map", MAP(STRING(), BIGINT())),
+ FIELD("multiSet", MULTISET(STRING())),
+ FIELD("map2map", MAP(STRING(), MAP(STRING(), INT()))));
+ jsonRowType = (RowType) jsonDataType.getLogicalType();
+ jsonTypeInfo = InternalTypeInfo.of(jsonRowType);
+
+ String schemaString = PravegaSchemaUtils.convertToJsonSchemaString(jsonRowType);
+ jsonSchema = JSONSchema.of("", schemaString, JsonNode.class);
+ SCHEMA_REGISTRY.schemaRegistryOperator().registerSchema(JSON_TEST_STREAM, jsonSchema,
+ SerializationFormat.Json);
+ SCHEMA_REGISTRY.operator().createTestStream(JSON_TEST_STREAM, 3);
+ }
+
+ private static void initProtobuf() throws Exception {
+ protobufDataType = ROW(
+ FIELD("name", STRING()),
+ FIELD("field1", INT())).notNull();
+ protobufRowType = (RowType) protobufDataType.getLogicalType();
+ protobufTypeInfo = InternalTypeInfo.of(protobufRowType);
+
+ protobufSchema = ProtobufSchema.of(testMessage.class);
+
+ SCHEMA_REGISTRY.schemaRegistryOperator().registerSchema(PROTOBUF_TEST_STREAM,
+ protobufSchema, SerializationFormat.Protobuf);
+ SCHEMA_REGISTRY.operator().createTestStream(PROTOBUF_TEST_STREAM, 3);
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Row convertToExternal(RowData rowData, DataType dataType) {
+ return (Row) DataFormatConverters.getConverterForDataType(dataType).toExternal(rowData);
+ }
}
diff --git a/src/test/java/io/pravega/connectors/flink/formats/registry/testProto/ProtobufTest.java b/src/test/java/io/pravega/connectors/flink/formats/registry/testProto/ProtobufTest.java
new file mode 100644
index 00000000..0e237092
--- /dev/null
+++ b/src/test/java/io/pravega/connectors/flink/formats/registry/testProto/ProtobufTest.java
@@ -0,0 +1,49 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: resources/proto/protobufTest.proto
+
+package io.pravega.connectors.flink.formats.registry.testProto;
+
+public final class ProtobufTest {
+ private ProtobufTest() {
+ }
+
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistryLite registry) {
+ }
+
+ public static void registerAllExtensions(
+ com.google.protobuf.ExtensionRegistry registry) {
+ registerAllExtensions(
+ (com.google.protobuf.ExtensionRegistryLite) registry);
+ }
+
+ static final com.google.protobuf.Descriptors.Descriptor internal_static_io_pravega_connectors_flink_formats_registry_testProto_testMessage_descriptor;
+ static final com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internal_static_io_pravega_connectors_flink_formats_registry_testProto_testMessage_fieldAccessorTable;
+
+ public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() {
+ return descriptor;
+ }
+
+ private static com.google.protobuf.Descriptors.FileDescriptor descriptor;
+ static {
+ java.lang.String[] descriptorData = {
+ "\n\"resources/proto/protobufTest.proto\0226io" +
+ ".pravega.connectors.flink.formats.regist" +
+ "ry.testProto\"+\n\013testMessage\022\014\n\004name\030\001 \001(" +
+ "\t\022\016\n\006field1\030\002 \001(\005B:\n6io.pravega.connecto" +
+ "rs.flink.formats.registry.testProtoP\001b\006p" +
+ "roto3"
+ };
+ descriptor = com.google.protobuf.Descriptors.FileDescriptor
+ .internalBuildGeneratedFileFrom(descriptorData,
+ new com.google.protobuf.Descriptors.FileDescriptor[] {
+ });
+ internal_static_io_pravega_connectors_flink_formats_registry_testProto_testMessage_descriptor = getDescriptor()
+ .getMessageTypes().get(0);
+ internal_static_io_pravega_connectors_flink_formats_registry_testProto_testMessage_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
+ internal_static_io_pravega_connectors_flink_formats_registry_testProto_testMessage_descriptor,
+ new java.lang.String[] { "Name", "Field1", });
+ }
+
+ // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/src/test/java/io/pravega/connectors/flink/formats/registry/testProto/testMessage.java b/src/test/java/io/pravega/connectors/flink/formats/registry/testProto/testMessage.java
new file mode 100644
index 00000000..c0abc8e3
--- /dev/null
+++ b/src/test/java/io/pravega/connectors/flink/formats/registry/testProto/testMessage.java
@@ -0,0 +1,621 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: resources/proto/protobufTest.proto
+
+package io.pravega.connectors.flink.formats.registry.testProto;
+
+/**
+ * Protobuf type {@code io.pravega.connectors.flink.formats.registry.testProto.testMessage}
+ */
+public final class testMessage extends
+ com.google.protobuf.GeneratedMessageV3 implements
+ // @@protoc_insertion_point(message_implements:io.pravega.connectors.flink.formats.registry.testProto.testMessage)
+ testMessageOrBuilder {
+private static final long serialVersionUID = 0L;
+ // Use testMessage.newBuilder() to construct.
+ private testMessage(com.google.protobuf.GeneratedMessageV3.Builder> builder) {
+ super(builder);
+ }
+ private testMessage() {
+ name_ = "";
+ }
+
+ @java.lang.Override
+ @SuppressWarnings({"unused"})
+ protected java.lang.Object newInstance(
+ UnusedPrivateParameter unused) {
+ return new testMessage();
+ }
+
+ @java.lang.Override
+ public final com.google.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private testMessage(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ this();
+ if (extensionRegistry == null) {
+ throw new java.lang.NullPointerException();
+ }
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ case 10: {
+ java.lang.String s = input.readStringRequireUtf8();
+
+ name_ = s;
+ break;
+ }
+ case 16: {
+
+ field1_ = input.readInt32();
+ break;
+ }
+ default: {
+ if (!parseUnknownField(
+ input, unknownFields, extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ }
+ }
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new com.google.protobuf.InvalidProtocolBufferException(
+ e).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return io.pravega.connectors.flink.formats.registry.testProto.ProtobufTest.internal_static_io_pravega_connectors_flink_formats_registry_testProto_testMessage_descriptor;
+ }
+
+ @java.lang.Override
+ protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return io.pravega.connectors.flink.formats.registry.testProto.ProtobufTest.internal_static_io_pravega_connectors_flink_formats_registry_testProto_testMessage_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ io.pravega.connectors.flink.formats.registry.testProto.testMessage.class, io.pravega.connectors.flink.formats.registry.testProto.testMessage.Builder.class);
+ }
+
+ public static final int NAME_FIELD_NUMBER = 1;
+ private volatile java.lang.Object name_;
+ /**
+ * string name = 1;
+ * @return The name.
+ */
+ @java.lang.Override
+ public java.lang.String getName() {
+ java.lang.Object ref = name_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ name_ = s;
+ return s;
+ }
+ }
+ /**
+ * string name = 1;
+ * @return The bytes for name.
+ */
+ @java.lang.Override
+ public com.google.protobuf.ByteString
+ getNameBytes() {
+ java.lang.Object ref = name_;
+ if (ref instanceof java.lang.String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ name_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+
+ public static final int FIELD1_FIELD_NUMBER = 2;
+ private int field1_;
+ /**
+ * int32 field1 = 2;
+ * @return The field1.
+ */
+ @java.lang.Override
+ public int getField1() {
+ return field1_;
+ }
+
+ private byte memoizedIsInitialized = -1;
+ @java.lang.Override
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized == 1) return true;
+ if (isInitialized == 0) return false;
+
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ @java.lang.Override
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ if (!getNameBytes().isEmpty()) {
+ com.google.protobuf.GeneratedMessageV3.writeString(output, 1, name_);
+ }
+ if (field1_ != 0) {
+ output.writeInt32(2, field1_);
+ }
+ unknownFields.writeTo(output);
+ }
+
+ @java.lang.Override
+ public int getSerializedSize() {
+ int size = memoizedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (!getNameBytes().isEmpty()) {
+ size += com.google.protobuf.GeneratedMessageV3.computeStringSize(1, name_);
+ }
+ if (field1_ != 0) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(2, field1_);
+ }
+ size += unknownFields.getSerializedSize();
+ memoizedSize = size;
+ return size;
+ }
+
+ @java.lang.Override
+ public boolean equals(final java.lang.Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof io.pravega.connectors.flink.formats.registry.testProto.testMessage)) {
+ return super.equals(obj);
+ }
+ io.pravega.connectors.flink.formats.registry.testProto.testMessage other = (io.pravega.connectors.flink.formats.registry.testProto.testMessage) obj;
+
+ if (!getName()
+ .equals(other.getName())) return false;
+ if (getField1()
+ != other.getField1()) return false;
+ if (!unknownFields.equals(other.unknownFields)) return false;
+ return true;
+ }
+
+ @java.lang.Override
+ public int hashCode() {
+ if (memoizedHashCode != 0) {
+ return memoizedHashCode;
+ }
+ int hash = 41;
+ hash = (19 * hash) + getDescriptor().hashCode();
+ hash = (37 * hash) + NAME_FIELD_NUMBER;
+ hash = (53 * hash) + getName().hashCode();
+ hash = (37 * hash) + FIELD1_FIELD_NUMBER;
+ hash = (53 * hash) + getField1();
+ hash = (29 * hash) + unknownFields.hashCode();
+ memoizedHashCode = hash;
+ return hash;
+ }
+
+ public static io.pravega.connectors.flink.formats.registry.testProto.testMessage parseFrom(
+ java.nio.ByteBuffer data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static io.pravega.connectors.flink.formats.registry.testProto.testMessage parseFrom(
+ java.nio.ByteBuffer data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static io.pravega.connectors.flink.formats.registry.testProto.testMessage parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static io.pravega.connectors.flink.formats.registry.testProto.testMessage parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static io.pravega.connectors.flink.formats.registry.testProto.testMessage parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static io.pravega.connectors.flink.formats.registry.testProto.testMessage parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static io.pravega.connectors.flink.formats.registry.testProto.testMessage parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseWithIOException(PARSER, input);
+ }
+ public static io.pravega.connectors.flink.formats.registry.testProto.testMessage parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseWithIOException(PARSER, input, extensionRegistry);
+ }
+ public static io.pravega.connectors.flink.formats.registry.testProto.testMessage parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseDelimitedWithIOException(PARSER, input);
+ }
+ public static io.pravega.connectors.flink.formats.registry.testProto.testMessage parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseDelimitedWithIOException(PARSER, input, extensionRegistry);
+ }
+ public static io.pravega.connectors.flink.formats.registry.testProto.testMessage parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseWithIOException(PARSER, input);
+ }
+ public static io.pravega.connectors.flink.formats.registry.testProto.testMessage parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return com.google.protobuf.GeneratedMessageV3
+ .parseWithIOException(PARSER, input, extensionRegistry);
+ }
+
+ @java.lang.Override
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder() {
+ return DEFAULT_INSTANCE.toBuilder();
+ }
+ public static Builder newBuilder(io.pravega.connectors.flink.formats.registry.testProto.testMessage prototype) {
+ return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
+ }
+ @java.lang.Override
+ public Builder toBuilder() {
+ return this == DEFAULT_INSTANCE
+ ? new Builder() : new Builder().mergeFrom(this);
+ }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessageV3.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code io.pravega.connectors.flink.formats.registry.testProto.testMessage}
+ */
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessageV3.Builder implements
+ // @@protoc_insertion_point(builder_implements:io.pravega.connectors.flink.formats.registry.testProto.testMessage)
+ io.pravega.connectors.flink.formats.registry.testProto.testMessageOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return io.pravega.connectors.flink.formats.registry.testProto.ProtobufTest.internal_static_io_pravega_connectors_flink_formats_registry_testProto_testMessage_descriptor;
+ }
+
+ @java.lang.Override
+ protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return io.pravega.connectors.flink.formats.registry.testProto.ProtobufTest.internal_static_io_pravega_connectors_flink_formats_registry_testProto_testMessage_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ io.pravega.connectors.flink.formats.registry.testProto.testMessage.class, io.pravega.connectors.flink.formats.registry.testProto.testMessage.Builder.class);
+ }
+
+ // Construct using io.pravega.connectors.flink.formats.registry.testProto.testMessage.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ com.google.protobuf.GeneratedMessageV3.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessageV3
+ .alwaysUseFieldBuilders) {
+ }
+ }
+ @java.lang.Override
+ public Builder clear() {
+ super.clear();
+ name_ = "";
+
+ field1_ = 0;
+
+ return this;
+ }
+
+ @java.lang.Override
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return io.pravega.connectors.flink.formats.registry.testProto.ProtobufTest.internal_static_io_pravega_connectors_flink_formats_registry_testProto_testMessage_descriptor;
+ }
+
+ @java.lang.Override
+ public io.pravega.connectors.flink.formats.registry.testProto.testMessage getDefaultInstanceForType() {
+ return io.pravega.connectors.flink.formats.registry.testProto.testMessage.getDefaultInstance();
+ }
+
+ @java.lang.Override
+ public io.pravega.connectors.flink.formats.registry.testProto.testMessage build() {
+ io.pravega.connectors.flink.formats.registry.testProto.testMessage result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ @java.lang.Override
+ public io.pravega.connectors.flink.formats.registry.testProto.testMessage buildPartial() {
+ io.pravega.connectors.flink.formats.registry.testProto.testMessage result = new io.pravega.connectors.flink.formats.registry.testProto.testMessage(this);
+ result.name_ = name_;
+ result.field1_ = field1_;
+ onBuilt();
+ return result;
+ }
+
+ @java.lang.Override
+ public Builder clone() {
+ return super.clone();
+ }
+ @java.lang.Override
+ public Builder setField(
+ com.google.protobuf.Descriptors.FieldDescriptor field,
+ java.lang.Object value) {
+ return super.setField(field, value);
+ }
+ @java.lang.Override
+ public Builder clearField(
+ com.google.protobuf.Descriptors.FieldDescriptor field) {
+ return super.clearField(field);
+ }
+ @java.lang.Override
+ public Builder clearOneof(
+ com.google.protobuf.Descriptors.OneofDescriptor oneof) {
+ return super.clearOneof(oneof);
+ }
+ @java.lang.Override
+ public Builder setRepeatedField(
+ com.google.protobuf.Descriptors.FieldDescriptor field,
+ int index, java.lang.Object value) {
+ return super.setRepeatedField(field, index, value);
+ }
+ @java.lang.Override
+ public Builder addRepeatedField(
+ com.google.protobuf.Descriptors.FieldDescriptor field,
+ java.lang.Object value) {
+ return super.addRepeatedField(field, value);
+ }
+ @java.lang.Override
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof io.pravega.connectors.flink.formats.registry.testProto.testMessage) {
+ return mergeFrom((io.pravega.connectors.flink.formats.registry.testProto.testMessage)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(io.pravega.connectors.flink.formats.registry.testProto.testMessage other) {
+ if (other == io.pravega.connectors.flink.formats.registry.testProto.testMessage.getDefaultInstance()) return this;
+ if (!other.getName().isEmpty()) {
+ name_ = other.name_;
+ onChanged();
+ }
+ if (other.getField1() != 0) {
+ setField1(other.getField1());
+ }
+ this.mergeUnknownFields(other.unknownFields);
+ onChanged();
+ return this;
+ }
+
+ @java.lang.Override
+ public final boolean isInitialized() {
+ return true;
+ }
+
+ @java.lang.Override
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ io.pravega.connectors.flink.formats.registry.testProto.testMessage parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (io.pravega.connectors.flink.formats.registry.testProto.testMessage) e.getUnfinishedMessage();
+ throw e.unwrapIOException();
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+
+ private java.lang.Object name_ = "";
+ /**
+ * string name = 1;
+ * @return The name.
+ */
+ public java.lang.String getName() {
+ java.lang.Object ref = name_;
+ if (!(ref instanceof java.lang.String)) {
+ com.google.protobuf.ByteString bs =
+ (com.google.protobuf.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ name_ = s;
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+ /**
+ * string name = 1;
+ * @return The bytes for name.
+ */
+ public com.google.protobuf.ByteString
+ getNameBytes() {
+ java.lang.Object ref = name_;
+ if (ref instanceof String) {
+ com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString.copyFromUtf8(
+ (java.lang.String) ref);
+ name_ = b;
+ return b;
+ } else {
+ return (com.google.protobuf.ByteString) ref;
+ }
+ }
+ /**
+ * string name = 1;
+ * @param value The name to set.
+ * @return This builder for chaining.
+ */
+ public Builder setName(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+
+ name_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * string name = 1;
+ * @return This builder for chaining.
+ */
+ public Builder clearName() {
+
+ name_ = getDefaultInstance().getName();
+ onChanged();
+ return this;
+ }
+ /**
+ * string name = 1;
+ * @param value The bytes for name to set.
+ * @return This builder for chaining.
+ */
+ public Builder setNameBytes(
+ com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ checkByteStringIsUtf8(value);
+
+ name_ = value;
+ onChanged();
+ return this;
+ }
+
+ private int field1_ ;
+ /**
+ * int32 field1 = 2;
+ * @return The field1.
+ */
+ @java.lang.Override
+ public int getField1() {
+ return field1_;
+ }
+ /**
+ * int32 field1 = 2;
+ * @param value The field1 to set.
+ * @return This builder for chaining.
+ */
+ public Builder setField1(int value) {
+
+ field1_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * int32 field1 = 2;
+ * @return This builder for chaining.
+ */
+ public Builder clearField1() {
+
+ field1_ = 0;
+ onChanged();
+ return this;
+ }
+ @java.lang.Override
+ public final Builder setUnknownFields(
+ final com.google.protobuf.UnknownFieldSet unknownFields) {
+ return super.setUnknownFields(unknownFields);
+ }
+
+ @java.lang.Override
+ public final Builder mergeUnknownFields(
+ final com.google.protobuf.UnknownFieldSet unknownFields) {
+ return super.mergeUnknownFields(unknownFields);
+ }
+
+
+ // @@protoc_insertion_point(builder_scope:io.pravega.connectors.flink.formats.registry.testProto.testMessage)
+ }
+
+ // @@protoc_insertion_point(class_scope:io.pravega.connectors.flink.formats.registry.testProto.testMessage)
+ private static final io.pravega.connectors.flink.formats.registry.testProto.testMessage DEFAULT_INSTANCE;
+ static {
+ DEFAULT_INSTANCE = new io.pravega.connectors.flink.formats.registry.testProto.testMessage();
+ }
+
+ public static io.pravega.connectors.flink.formats.registry.testProto.testMessage getDefaultInstance() {
+ return DEFAULT_INSTANCE;
+ }
+
+ private static final com.google.protobuf.Parser
+ PARSER = new com.google.protobuf.AbstractParser() {
+ @java.lang.Override
+ public testMessage parsePartialFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return new testMessage(input, extensionRegistry);
+ }
+ };
+
+ public static com.google.protobuf.Parser parser() {
+ return PARSER;
+ }
+
+ @java.lang.Override
+ public com.google.protobuf.Parser getParserForType() {
+ return PARSER;
+ }
+
+ @java.lang.Override
+ public io.pravega.connectors.flink.formats.registry.testProto.testMessage getDefaultInstanceForType() {
+ return DEFAULT_INSTANCE;
+ }
+
+}
+
diff --git a/src/test/java/io/pravega/connectors/flink/formats/registry/testProto/testMessageOrBuilder.java b/src/test/java/io/pravega/connectors/flink/formats/registry/testProto/testMessageOrBuilder.java
new file mode 100644
index 00000000..8e9bed07
--- /dev/null
+++ b/src/test/java/io/pravega/connectors/flink/formats/registry/testProto/testMessageOrBuilder.java
@@ -0,0 +1,27 @@
+// Generated by the protocol buffer compiler. DO NOT EDIT!
+// source: resources/proto/protobufTest.proto
+
+package io.pravega.connectors.flink.formats.registry.testProto;
+
+public interface testMessageOrBuilder extends
+ // @@protoc_insertion_point(interface_extends:io.pravega.connectors.flink.formats.registry.testProto.testMessage)
+ com.google.protobuf.MessageOrBuilder {
+
+ /**
+ * string name = 1;
+ * @return The name.
+ */
+ java.lang.String getName();
+ /**
+ * string name = 1;
+ * @return The bytes for name.
+ */
+ com.google.protobuf.ByteString
+ getNameBytes();
+
+ /**
+ * int32 field1 = 2;
+ * @return The field1.
+ */
+ int getField1();
+}
diff --git a/src/test/resources/proto/protobufTest.proto b/src/test/resources/proto/protobufTest.proto
new file mode 100644
index 00000000..4a11108f
--- /dev/null
+++ b/src/test/resources/proto/protobufTest.proto
@@ -0,0 +1,9 @@
+syntax = "proto3";
+package io.pravega.connectors.flink.formats.registry.testProto;
+option java_package = "io.pravega.connectors.flink.formats.registry.testProto";
+option java_multiple_files = true;
+
+message testMessage {
+ string name = 1;
+ int32 field1 = 2;
+}