From 3a9e1e83fad577d8408fb820563735a5d5f13945 Mon Sep 17 00:00:00 2001 From: Zouxxyy Date: Tue, 17 Dec 2024 18:15:37 +0800 Subject: [PATCH] [core] Introduce Variant Data (#4729) --- LICENSE | 5 + .../paimon/data/variant/GenericVariant.java | 484 +++++++++++++ .../data/variant/GenericVariantBuilder.java | 639 ++++++++++++++++++ .../data/variant/GenericVariantUtil.java | 628 +++++++++++++++++ .../paimon/data/variant/PathSegment.java | 104 +++ .../apache/paimon/data/variant/Variant.java | 52 ++ .../data/variant/GenericVariantTest.java | 98 +++ 7 files changed, 2010 insertions(+) create mode 100644 paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantBuilder.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantUtil.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/data/variant/PathSegment.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java create mode 100644 paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java diff --git a/LICENSE b/LICENSE index 38bbe5ec276d..d669daad0244 100644 --- a/LICENSE +++ b/LICENSE @@ -270,6 +270,11 @@ paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java from https://parquet.apache.org/ version 1.14.0 +paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java +paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantBuilder.java +paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantUtil.java +from https://spark.apache.org/ version 4.0.0-preview2 + MIT License ----------- diff --git a/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java new file mode 100644 index 000000000000..355e9123cc2d --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariant.java @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.variant; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonFactory; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator; + +import java.io.CharArrayWriter; +import java.io.IOException; +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Base64; +import java.util.Locale; +import java.util.Objects; + +import static org.apache.paimon.data.variant.GenericVariantUtil.BINARY_SEARCH_THRESHOLD; +import static org.apache.paimon.data.variant.GenericVariantUtil.SIZE_LIMIT; +import static org.apache.paimon.data.variant.GenericVariantUtil.Type; +import static org.apache.paimon.data.variant.GenericVariantUtil.VERSION; +import static org.apache.paimon.data.variant.GenericVariantUtil.VERSION_MASK; +import static org.apache.paimon.data.variant.GenericVariantUtil.checkIndex; +import static org.apache.paimon.data.variant.GenericVariantUtil.getMetadataKey; +import static org.apache.paimon.data.variant.GenericVariantUtil.handleArray; +import static org.apache.paimon.data.variant.GenericVariantUtil.handleObject; +import static org.apache.paimon.data.variant.GenericVariantUtil.malformedVariant; +import static org.apache.paimon.data.variant.GenericVariantUtil.readUnsigned; +import static org.apache.paimon.data.variant.GenericVariantUtil.valueSize; +import static org.apache.paimon.data.variant.GenericVariantUtil.variantConstructorSizeLimit; + +/* This file is based on source code from the Spark Project (http://spark.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** An internal data structure implementing {@link Variant}. */ +public final class GenericVariant implements Variant { + + private final byte[] value; + private final byte[] metadata; + // The variant value doesn't use the whole `value` binary, but starts from its `pos` index and + // spans a size of `valueSize(value, pos)`. This design avoids frequent copies of the value + // binary when reading a sub-variant in the array/object element. + private final int pos; + + public GenericVariant(byte[] value, byte[] metadata) { + this(value, metadata, 0); + } + + private GenericVariant(byte[] value, byte[] metadata, int pos) { + this.value = value; + this.metadata = metadata; + this.pos = pos; + // There is currently only one allowed version. + if (metadata.length < 1 || (metadata[0] & VERSION_MASK) != VERSION) { + throw malformedVariant(); + } + // Don't attempt to use a Variant larger than 16 MiB. We'll never produce one, and it risks + // memory instability. + if (metadata.length > SIZE_LIMIT || value.length > SIZE_LIMIT) { + throw variantConstructorSizeLimit(); + } + } + + @Override + public byte[] value() { + if (pos == 0) { + return value; + } + int size = valueSize(value, pos); + checkIndex(pos + size - 1, value.length); + return Arrays.copyOfRange(value, pos, pos + size); + } + + public byte[] rawValue() { + return value; + } + + @Override + public byte[] metadata() { + return metadata; + } + + public int pos() { + return pos; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + GenericVariant that = (GenericVariant) o; + return pos == that.pos + && Objects.deepEquals(value, that.value) + && Objects.deepEquals(metadata, that.metadata); + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(value), Arrays.hashCode(metadata), pos); + } + + public static Variant fromJson(String json) { + try { + return GenericVariantBuilder.parseJson(json, false); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public String toJson() { + return toJson(ZoneOffset.UTC); + } + + // Stringify the variant in JSON format. + // Throw `MALFORMED_VARIANT` if the variant is malformed. + public String toJson(ZoneId zoneId) { + StringBuilder sb = new StringBuilder(); + toJsonImpl(value, metadata, pos, sb, zoneId); + return sb.toString(); + } + + @Override + public String toString() { + return toJson(); + } + + public Object variantGet(String path) { + GenericVariant v = this; + PathSegment[] parsedPath = PathSegment.parse(path); + for (PathSegment pathSegment : parsedPath) { + if (pathSegment.isKey() && v.getType() == Type.OBJECT) { + v = v.getFieldByKey(pathSegment.getKey()); + } else if (pathSegment.isIndex() && v.getType() == Type.ARRAY) { + v = v.getElementAtIndex(pathSegment.getIndex()); + } else { + return null; + } + } + + switch (v.getType()) { + case OBJECT: + case ARRAY: + return v.toJson(); + case STRING: + return v.getString(); + case LONG: + return v.getLong(); + case DOUBLE: + return v.getDouble(); + case DECIMAL: + return v.getDecimal(); + case BOOLEAN: + return v.getBoolean(); + case NULL: + return null; + default: + // todo: support other types + throw new IllegalArgumentException("Unsupported type: " + v.getType()); + } + } + + // Get a boolean value from the variant. + public boolean getBoolean() { + return GenericVariantUtil.getBoolean(value, pos); + } + + // Get a long value from the variant. + public long getLong() { + return GenericVariantUtil.getLong(value, pos); + } + + // Get a double value from the variant. + public double getDouble() { + return GenericVariantUtil.getDouble(value, pos); + } + + // Get a decimal value from the variant. + public BigDecimal getDecimal() { + return GenericVariantUtil.getDecimal(value, pos); + } + + // Get a float value from the variant. + public float getFloat() { + return GenericVariantUtil.getFloat(value, pos); + } + + // Get a binary value from the variant. + public byte[] getBinary() { + return GenericVariantUtil.getBinary(value, pos); + } + + // Get a string value from the variant. + public String getString() { + return GenericVariantUtil.getString(value, pos); + } + + // Get the type info bits from a variant value. + public int getTypeInfo() { + return GenericVariantUtil.getTypeInfo(value, pos); + } + + // Get the value type of the variant. + public Type getType() { + return GenericVariantUtil.getType(value, pos); + } + + // Get the number of object fields in the variant. + // It is only legal to call it when `getType()` is `Type.OBJECT`. + public int objectSize() { + return handleObject( + value, pos, (size, idSize, offsetSize, idStart, offsetStart, dataStart) -> size); + } + + // Find the field value whose key is equal to `key`. Return null if the key is not found. + // It is only legal to call it when `getType()` is `Type.OBJECT`. + public GenericVariant getFieldByKey(String key) { + return handleObject( + value, + pos, + (size, idSize, offsetSize, idStart, offsetStart, dataStart) -> { + // Use linear search for a short list. Switch to binary search when the length + // reaches `BINARY_SEARCH_THRESHOLD`. + if (size < BINARY_SEARCH_THRESHOLD) { + for (int i = 0; i < size; ++i) { + int id = readUnsigned(value, idStart + idSize * i, idSize); + if (key.equals(getMetadataKey(metadata, id))) { + int offset = + readUnsigned( + value, offsetStart + offsetSize * i, offsetSize); + return new GenericVariant(value, metadata, dataStart + offset); + } + } + } else { + int low = 0; + int high = size - 1; + while (low <= high) { + // Use unsigned right shift to compute the middle of `low` and `high`. + // This is not only a performance optimization, because it can properly + // handle the case where `low + high` overflows int. + int mid = (low + high) >>> 1; + int id = readUnsigned(value, idStart + idSize * mid, idSize); + int cmp = getMetadataKey(metadata, id).compareTo(key); + if (cmp < 0) { + low = mid + 1; + } else if (cmp > 0) { + high = mid - 1; + } else { + int offset = + readUnsigned( + value, offsetStart + offsetSize * mid, offsetSize); + return new GenericVariant(value, metadata, dataStart + offset); + } + } + } + return null; + }); + } + + /** Variant object field. */ + public static final class ObjectField { + public final String key; + public final Variant value; + + public ObjectField(String key, Variant value) { + this.key = key; + this.value = value; + } + } + + // Get the object field at the `index` slot. Return null if `index` is out of the bound of + // `[0, objectSize())`. + // It is only legal to call it when `getType()` is `Type.OBJECT`. + public ObjectField getFieldAtIndex(int index) { + return handleObject( + value, + pos, + (size, idSize, offsetSize, idStart, offsetStart, dataStart) -> { + if (index < 0 || index >= size) { + return null; + } + int id = readUnsigned(value, idStart + idSize * index, idSize); + int offset = readUnsigned(value, offsetStart + offsetSize * index, offsetSize); + String key = getMetadataKey(metadata, id); + Variant v = new GenericVariant(value, metadata, dataStart + offset); + return new ObjectField(key, v); + }); + } + + // Get the dictionary ID for the object field at the `index` slot. Throws malformedVariant if + // `index` is out of the bound of `[0, objectSize())`. + // It is only legal to call it when `getType()` is `Type.OBJECT`. + public int getDictionaryIdAtIndex(int index) { + return handleObject( + value, + pos, + (size, idSize, offsetSize, idStart, offsetStart, dataStart) -> { + if (index < 0 || index >= size) { + throw malformedVariant(); + } + return readUnsigned(value, idStart + idSize * index, idSize); + }); + } + + // Get the number of array elements in the variant. + // It is only legal to call it when `getType()` is `Type.ARRAY`. + public int arraySize() { + return handleArray(value, pos, (size, offsetSize, offsetStart, dataStart) -> size); + } + + // Get the array element at the `index` slot. Return null if `index` is out of the bound of + // `[0, arraySize())`. + // It is only legal to call it when `getType()` is `Type.ARRAY`. + public GenericVariant getElementAtIndex(int index) { + return handleArray( + value, + pos, + (size, offsetSize, offsetStart, dataStart) -> { + if (index < 0 || index >= size) { + return null; + } + int offset = readUnsigned(value, offsetStart + offsetSize * index, offsetSize); + return new GenericVariant(value, metadata, dataStart + offset); + }); + } + + // Escape a string so that it can be pasted into JSON structure. + // For example, if `str` only contains a new-line character, then the result content is "\n" + // (4 characters). + private static String escapeJson(String str) { + try (CharArrayWriter writer = new CharArrayWriter(); + JsonGenerator gen = new JsonFactory().createGenerator(writer)) { + gen.writeString(str); + gen.flush(); + return writer.toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // A simplified and more performant version of `sb.append(escapeJson(str))`. It is used when we + // know `str` doesn't contain any special character that needs escaping. + static void appendQuoted(StringBuilder sb, String str) { + sb.append('"'); + sb.append(str); + sb.append('"'); + } + + private static final DateTimeFormatter TIMESTAMP_NTZ_FORMATTER = + new DateTimeFormatterBuilder() + .append(DateTimeFormatter.ISO_LOCAL_DATE) + .appendLiteral(' ') + .append(DateTimeFormatter.ISO_LOCAL_TIME) + .toFormatter(Locale.US); + + private static final DateTimeFormatter TIMESTAMP_FORMATTER = + new DateTimeFormatterBuilder() + .append(TIMESTAMP_NTZ_FORMATTER) + .appendOffset("+HH:MM", "+00:00") + .toFormatter(Locale.US); + + private static Instant microsToInstant(long timestamp) { + return Instant.EPOCH.plus(timestamp, ChronoUnit.MICROS); + } + + private static void toJsonImpl( + byte[] value, byte[] metadata, int pos, StringBuilder sb, ZoneId zoneId) { + switch (GenericVariantUtil.getType(value, pos)) { + case OBJECT: + handleObject( + value, + pos, + (size, idSize, offsetSize, idStart, offsetStart, dataStart) -> { + sb.append('{'); + for (int i = 0; i < size; ++i) { + int id = readUnsigned(value, idStart + idSize * i, idSize); + int offset = + readUnsigned( + value, offsetStart + offsetSize * i, offsetSize); + int elementPos = dataStart + offset; + if (i != 0) { + sb.append(','); + } + sb.append(escapeJson(getMetadataKey(metadata, id))); + sb.append(':'); + toJsonImpl(value, metadata, elementPos, sb, zoneId); + } + sb.append('}'); + return null; + }); + break; + case ARRAY: + handleArray( + value, + pos, + (size, offsetSize, offsetStart, dataStart) -> { + sb.append('['); + for (int i = 0; i < size; ++i) { + int offset = + readUnsigned( + value, offsetStart + offsetSize * i, offsetSize); + int elementPos = dataStart + offset; + if (i != 0) { + sb.append(','); + } + toJsonImpl(value, metadata, elementPos, sb, zoneId); + } + sb.append(']'); + return null; + }); + break; + case NULL: + sb.append("null"); + break; + case BOOLEAN: + sb.append(GenericVariantUtil.getBoolean(value, pos)); + break; + case LONG: + sb.append(GenericVariantUtil.getLong(value, pos)); + break; + case STRING: + sb.append(escapeJson(GenericVariantUtil.getString(value, pos))); + break; + case DOUBLE: + sb.append(GenericVariantUtil.getDouble(value, pos)); + break; + case DECIMAL: + sb.append(GenericVariantUtil.getDecimal(value, pos).toPlainString()); + break; + case DATE: + appendQuoted( + sb, + LocalDate.ofEpochDay((int) GenericVariantUtil.getLong(value, pos)) + .toString()); + break; + case TIMESTAMP: + appendQuoted( + sb, + TIMESTAMP_FORMATTER.format( + microsToInstant(GenericVariantUtil.getLong(value, pos)) + .atZone(zoneId))); + break; + case TIMESTAMP_NTZ: + appendQuoted( + sb, + TIMESTAMP_NTZ_FORMATTER.format( + microsToInstant(GenericVariantUtil.getLong(value, pos)) + .atZone(ZoneOffset.UTC))); + break; + case FLOAT: + sb.append(GenericVariantUtil.getFloat(value, pos)); + break; + case BINARY: + appendQuoted( + sb, + Base64.getEncoder() + .encodeToString(GenericVariantUtil.getBinary(value, pos))); + break; + } + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantBuilder.java b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantBuilder.java new file mode 100644 index 000000000000..187fb9259e0e --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantBuilder.java @@ -0,0 +1,639 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.variant; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonFactory; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonParseException; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonToken; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.exc.InputCoercionException; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; + +import static org.apache.paimon.data.variant.GenericVariantUtil.ARRAY; +import static org.apache.paimon.data.variant.GenericVariantUtil.BASIC_TYPE_MASK; +import static org.apache.paimon.data.variant.GenericVariantUtil.BINARY; +import static org.apache.paimon.data.variant.GenericVariantUtil.DATE; +import static org.apache.paimon.data.variant.GenericVariantUtil.DECIMAL16; +import static org.apache.paimon.data.variant.GenericVariantUtil.DECIMAL4; +import static org.apache.paimon.data.variant.GenericVariantUtil.DECIMAL8; +import static org.apache.paimon.data.variant.GenericVariantUtil.DOUBLE; +import static org.apache.paimon.data.variant.GenericVariantUtil.FALSE; +import static org.apache.paimon.data.variant.GenericVariantUtil.FLOAT; +import static org.apache.paimon.data.variant.GenericVariantUtil.INT1; +import static org.apache.paimon.data.variant.GenericVariantUtil.INT2; +import static org.apache.paimon.data.variant.GenericVariantUtil.INT4; +import static org.apache.paimon.data.variant.GenericVariantUtil.INT8; +import static org.apache.paimon.data.variant.GenericVariantUtil.LONG_STR; +import static org.apache.paimon.data.variant.GenericVariantUtil.MAX_DECIMAL16_PRECISION; +import static org.apache.paimon.data.variant.GenericVariantUtil.MAX_DECIMAL4_PRECISION; +import static org.apache.paimon.data.variant.GenericVariantUtil.MAX_DECIMAL8_PRECISION; +import static org.apache.paimon.data.variant.GenericVariantUtil.MAX_SHORT_STR_SIZE; +import static org.apache.paimon.data.variant.GenericVariantUtil.NULL; +import static org.apache.paimon.data.variant.GenericVariantUtil.OBJECT; +import static org.apache.paimon.data.variant.GenericVariantUtil.SIZE_LIMIT; +import static org.apache.paimon.data.variant.GenericVariantUtil.TIMESTAMP; +import static org.apache.paimon.data.variant.GenericVariantUtil.TIMESTAMP_NTZ; +import static org.apache.paimon.data.variant.GenericVariantUtil.TRUE; +import static org.apache.paimon.data.variant.GenericVariantUtil.U16_MAX; +import static org.apache.paimon.data.variant.GenericVariantUtil.U24_MAX; +import static org.apache.paimon.data.variant.GenericVariantUtil.U24_SIZE; +import static org.apache.paimon.data.variant.GenericVariantUtil.U32_SIZE; +import static org.apache.paimon.data.variant.GenericVariantUtil.U8_MAX; +import static org.apache.paimon.data.variant.GenericVariantUtil.VERSION; +import static org.apache.paimon.data.variant.GenericVariantUtil.arrayHeader; +import static org.apache.paimon.data.variant.GenericVariantUtil.checkIndex; +import static org.apache.paimon.data.variant.GenericVariantUtil.getMetadataKey; +import static org.apache.paimon.data.variant.GenericVariantUtil.handleArray; +import static org.apache.paimon.data.variant.GenericVariantUtil.handleObject; +import static org.apache.paimon.data.variant.GenericVariantUtil.objectHeader; +import static org.apache.paimon.data.variant.GenericVariantUtil.primitiveHeader; +import static org.apache.paimon.data.variant.GenericVariantUtil.readUnsigned; +import static org.apache.paimon.data.variant.GenericVariantUtil.shortStrHeader; +import static org.apache.paimon.data.variant.GenericVariantUtil.valueSize; +import static org.apache.paimon.data.variant.GenericVariantUtil.writeLong; + +/* This file is based on source code from the Spark Project (http://spark.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** Build variant value and metadata by parsing JSON values. */ +public class GenericVariantBuilder { + public GenericVariantBuilder(boolean allowDuplicateKeys) { + this.allowDuplicateKeys = allowDuplicateKeys; + } + + /** + * Parse a JSON string as a Variant value. + * + * @throws IOException if any JSON parsing error happens. + */ + public static Variant parseJson(String json, boolean allowDuplicateKeys) throws IOException { + try (JsonParser parser = new JsonFactory().createParser(json)) { + parser.nextToken(); + return parseJson(parser, allowDuplicateKeys); + } + } + + /** + * Similar {@link #parseJson(String, boolean)}, but takes a JSON parser instead of string input. + */ + public static Variant parseJson(JsonParser parser, boolean allowDuplicateKeys) + throws IOException { + GenericVariantBuilder builder = new GenericVariantBuilder(allowDuplicateKeys); + builder.buildJson(parser); + return builder.result(); + } + + // Build the variant metadata from `dictionaryKeys` and return the variant result. + public Variant result() { + int numKeys = dictionaryKeys.size(); + // Use long to avoid overflow in accumulating lengths. + long dictionaryStringSize = 0; + for (byte[] key : dictionaryKeys) { + dictionaryStringSize += key.length; + } + // Determine the number of bytes required per offset entry. + // The largest offset is the one-past-the-end value, which is total string size. It's very + // unlikely that the number of keys could be larger, but incorporate that into the + // calculation in case of pathological data. + long maxSize = Math.max(dictionaryStringSize, numKeys); + if (maxSize > SIZE_LIMIT) { + throw new RuntimeException(); + } + int offsetSize = getIntegerSize((int) maxSize); + + int offsetStart = 1 + offsetSize; + int stringStart = offsetStart + (numKeys + 1) * offsetSize; + long metadataSize = stringStart + dictionaryStringSize; + + if (metadataSize > SIZE_LIMIT) { + throw new RuntimeException(); + } + byte[] metadata = new byte[(int) metadataSize]; + int headerByte = VERSION | ((offsetSize - 1) << 6); + writeLong(metadata, 0, headerByte, 1); + writeLong(metadata, 1, numKeys, offsetSize); + int currentOffset = 0; + for (int i = 0; i < numKeys; ++i) { + writeLong(metadata, offsetStart + i * offsetSize, currentOffset, offsetSize); + byte[] key = dictionaryKeys.get(i); + System.arraycopy(key, 0, metadata, stringStart + currentOffset, key.length); + currentOffset += key.length; + } + writeLong(metadata, offsetStart + numKeys * offsetSize, currentOffset, offsetSize); + return new GenericVariant(Arrays.copyOfRange(writeBuffer, 0, writePos), metadata); + } + + // Return the variant value only, without metadata. + // Used in shredding to produce a final value, where all shredded values refer to a common + // metadata. It is expected to be called instead of `result()`, although it is valid to call + // both + // methods, in any order. + public byte[] valueWithoutMetadata() { + return Arrays.copyOfRange(writeBuffer, 0, writePos); + } + + public void appendString(String str) { + byte[] text = str.getBytes(StandardCharsets.UTF_8); + boolean longStr = text.length > MAX_SHORT_STR_SIZE; + checkCapacity((longStr ? 1 + U32_SIZE : 1) + text.length); + if (longStr) { + writeBuffer[writePos++] = primitiveHeader(LONG_STR); + writeLong(writeBuffer, writePos, text.length, U32_SIZE); + writePos += U32_SIZE; + } else { + writeBuffer[writePos++] = shortStrHeader(text.length); + } + System.arraycopy(text, 0, writeBuffer, writePos, text.length); + writePos += text.length; + } + + public void appendNull() { + checkCapacity(1); + writeBuffer[writePos++] = primitiveHeader(NULL); + } + + public void appendBoolean(boolean b) { + checkCapacity(1); + writeBuffer[writePos++] = primitiveHeader(b ? TRUE : FALSE); + } + + // Append a long value to the variant builder. The actual used integer type depends on the value + // range of the long value. + public void appendLong(long l) { + checkCapacity(1 + 8); + if (l == (byte) l) { + writeBuffer[writePos++] = primitiveHeader(INT1); + writeLong(writeBuffer, writePos, l, 1); + writePos += 1; + } else if (l == (short) l) { + writeBuffer[writePos++] = primitiveHeader(INT2); + writeLong(writeBuffer, writePos, l, 2); + writePos += 2; + } else if (l == (int) l) { + writeBuffer[writePos++] = primitiveHeader(INT4); + writeLong(writeBuffer, writePos, l, 4); + writePos += 4; + } else { + writeBuffer[writePos++] = primitiveHeader(INT8); + writeLong(writeBuffer, writePos, l, 8); + writePos += 8; + } + } + + public void appendDouble(double d) { + checkCapacity(1 + 8); + writeBuffer[writePos++] = primitiveHeader(DOUBLE); + writeLong(writeBuffer, writePos, Double.doubleToLongBits(d), 8); + writePos += 8; + } + + // Append a decimal value to the variant builder. The caller should guarantee that its precision + // and scale fit into `MAX_DECIMAL16_PRECISION`. + public void appendDecimal(BigDecimal d) { + checkCapacity(2 + 16); + BigInteger unscaled = d.unscaledValue(); + if (d.scale() <= MAX_DECIMAL4_PRECISION && d.precision() <= MAX_DECIMAL4_PRECISION) { + writeBuffer[writePos++] = primitiveHeader(DECIMAL4); + writeBuffer[writePos++] = (byte) d.scale(); + writeLong(writeBuffer, writePos, unscaled.intValueExact(), 4); + writePos += 4; + } else if (d.scale() <= MAX_DECIMAL8_PRECISION && d.precision() <= MAX_DECIMAL8_PRECISION) { + writeBuffer[writePos++] = primitiveHeader(DECIMAL8); + writeBuffer[writePos++] = (byte) d.scale(); + writeLong(writeBuffer, writePos, unscaled.longValueExact(), 8); + writePos += 8; + } else { + assert d.scale() <= MAX_DECIMAL16_PRECISION && d.precision() <= MAX_DECIMAL16_PRECISION; + writeBuffer[writePos++] = primitiveHeader(DECIMAL16); + writeBuffer[writePos++] = (byte) d.scale(); + // `toByteArray` returns a big-endian representation. We need to copy it reversely and + // sign extend it to 16 bytes. + byte[] bytes = unscaled.toByteArray(); + for (int i = 0; i < bytes.length; ++i) { + writeBuffer[writePos + i] = bytes[bytes.length - 1 - i]; + } + byte sign = (byte) (bytes[0] < 0 ? -1 : 0); + for (int i = bytes.length; i < 16; ++i) { + writeBuffer[writePos + i] = sign; + } + writePos += 16; + } + } + + public void appendDate(int daysSinceEpoch) { + checkCapacity(1 + 4); + writeBuffer[writePos++] = primitiveHeader(DATE); + writeLong(writeBuffer, writePos, daysSinceEpoch, 4); + writePos += 4; + } + + public void appendTimestamp(long microsSinceEpoch) { + checkCapacity(1 + 8); + writeBuffer[writePos++] = primitiveHeader(TIMESTAMP); + writeLong(writeBuffer, writePos, microsSinceEpoch, 8); + writePos += 8; + } + + public void appendTimestampNtz(long microsSinceEpoch) { + checkCapacity(1 + 8); + writeBuffer[writePos++] = primitiveHeader(TIMESTAMP_NTZ); + writeLong(writeBuffer, writePos, microsSinceEpoch, 8); + writePos += 8; + } + + public void appendFloat(float f) { + checkCapacity(1 + 4); + writeBuffer[writePos++] = primitiveHeader(FLOAT); + writeLong(writeBuffer, writePos, Float.floatToIntBits(f), 8); + writePos += 4; + } + + public void appendBinary(byte[] binary) { + checkCapacity(1 + U32_SIZE + binary.length); + writeBuffer[writePos++] = primitiveHeader(BINARY); + writeLong(writeBuffer, writePos, binary.length, U32_SIZE); + writePos += U32_SIZE; + System.arraycopy(binary, 0, writeBuffer, writePos, binary.length); + writePos += binary.length; + } + + // Add a key to the variant dictionary. If the key already exists, the dictionary is not + // modified. + // In either case, return the id of the key. + public int addKey(String key) { + int id; + if (dictionary.containsKey(key)) { + id = dictionary.get(key); + } else { + id = dictionaryKeys.size(); + dictionary.put(key, id); + dictionaryKeys.add(key.getBytes(StandardCharsets.UTF_8)); + } + return id; + } + + // Return the current write position of the variant builder. It is used together with + // `finishWritingObject` or `finishWritingArray`. + public int getWritePos() { + return writePos; + } + + // Finish writing a variant object after all of its fields have already been written. The + // process + // is as follows: + // 1. The caller calls `getWritePos` before writing any fields to obtain the `start` parameter. + // 2. The caller appends all the object fields to the builder. In the meantime, it should + // maintain + // the `fields` parameter. Before appending each field, it should append an entry to `fields` to + // record the offset of the field. The offset is computed as `getWritePos() - start`. + // 3. The caller calls `finishWritingObject` to finish writing a variant object. + // + // This function is responsible to sort the fields by key. If there are duplicate field keys: + // - when `allowDuplicateKeys` is true, the field with the greatest offset value (the last + // appended one) is kept. + // - otherwise, throw an exception. + public void finishWritingObject(int start, ArrayList fields) { + int size = fields.size(); + Collections.sort(fields); + int maxId = size == 0 ? 0 : fields.get(0).id; + if (allowDuplicateKeys) { + int distinctPos = 0; + // Maintain a list of distinct keys in-place. + for (int i = 1; i < size; ++i) { + maxId = Math.max(maxId, fields.get(i).id); + if (fields.get(i).id == fields.get(i - 1).id) { + // Found a duplicate key. Keep the field with a greater offset. + if (fields.get(distinctPos).offset < fields.get(i).offset) { + fields.set( + distinctPos, + fields.get(distinctPos).withNewOffset(fields.get(i).offset)); + } + } else { + // Found a distinct key. Add the field to the list. + ++distinctPos; + fields.set(distinctPos, fields.get(i)); + } + } + if (distinctPos + 1 < fields.size()) { + size = distinctPos + 1; + // Resize `fields` to `size`. + fields.subList(size, fields.size()).clear(); + // Sort the fields by offsets so that we can move the value data of each field to + // the new offset without overwriting the fields after it. + fields.sort(Comparator.comparingInt(f -> f.offset)); + int currentOffset = 0; + for (int i = 0; i < size; ++i) { + int oldOffset = fields.get(i).offset; + int fieldSize = GenericVariantUtil.valueSize(writeBuffer, start + oldOffset); + System.arraycopy( + writeBuffer, + start + oldOffset, + writeBuffer, + start + currentOffset, + fieldSize); + fields.set(i, fields.get(i).withNewOffset(currentOffset)); + currentOffset += fieldSize; + } + writePos = start + currentOffset; + // Change back to the sort order by field keys to meet the variant spec. + Collections.sort(fields); + } + } else { + for (int i = 1; i < size; ++i) { + maxId = Math.max(maxId, fields.get(i).id); + String key = fields.get(i).key; + if (key.equals(fields.get(i - 1).key)) { + throw new RuntimeException("VARIANT_DUPLICATE_KEY"); + } + } + } + int dataSize = writePos - start; + boolean largeSize = size > U8_MAX; + int sizeBytes = largeSize ? U32_SIZE : 1; + int idSize = getIntegerSize(maxId); + int offsetSize = getIntegerSize(dataSize); + // The space for header byte, object size, id list, and offset list. + int headerSize = 1 + sizeBytes + size * idSize + (size + 1) * offsetSize; + checkCapacity(headerSize); + // Shift the just-written field data to make room for the object header section. + System.arraycopy(writeBuffer, start, writeBuffer, start + headerSize, dataSize); + writePos += headerSize; + writeBuffer[start] = objectHeader(largeSize, idSize, offsetSize); + writeLong(writeBuffer, start + 1, size, sizeBytes); + int idStart = start + 1 + sizeBytes; + int offsetStart = idStart + size * idSize; + for (int i = 0; i < size; ++i) { + writeLong(writeBuffer, idStart + i * idSize, fields.get(i).id, idSize); + writeLong(writeBuffer, offsetStart + i * offsetSize, fields.get(i).offset, offsetSize); + } + writeLong(writeBuffer, offsetStart + size * offsetSize, dataSize, offsetSize); + } + + // Finish writing a variant array after all of its elements have already been written. The + // process is similar to that of `finishWritingObject`. + public void finishWritingArray(int start, ArrayList offsets) { + int dataSize = writePos - start; + int size = offsets.size(); + boolean largeSize = size > U8_MAX; + int sizeBytes = largeSize ? U32_SIZE : 1; + int offsetSize = getIntegerSize(dataSize); + // The space for header byte, object size, and offset list. + int headerSize = 1 + sizeBytes + (size + 1) * offsetSize; + checkCapacity(headerSize); + // Shift the just-written field data to make room for the header section. + System.arraycopy(writeBuffer, start, writeBuffer, start + headerSize, dataSize); + writePos += headerSize; + writeBuffer[start] = arrayHeader(largeSize, offsetSize); + writeLong(writeBuffer, start + 1, size, sizeBytes); + int offsetStart = start + 1 + sizeBytes; + for (int i = 0; i < size; ++i) { + writeLong(writeBuffer, offsetStart + i * offsetSize, offsets.get(i), offsetSize); + } + writeLong(writeBuffer, offsetStart + size * offsetSize, dataSize, offsetSize); + } + + // Append a variant value to the variant builder. We need to insert the keys in the input + // variant into the current variant dictionary and rebuild it with new field ids. For scalar + // values in the input variant, we can directly copy the binary slice. + public void appendVariant(GenericVariant v) { + appendVariantImpl(v.rawValue(), v.metadata(), v.pos()); + } + + private void appendVariantImpl(byte[] value, byte[] metadata, int pos) { + checkIndex(pos, value.length); + int basicType = value[pos] & BASIC_TYPE_MASK; + switch (basicType) { + case OBJECT: + handleObject( + value, + pos, + (size, idSize, offsetSize, idStart, offsetStart, dataStart) -> { + ArrayList fields = new ArrayList<>(size); + int start = writePos; + for (int i = 0; i < size; ++i) { + int id = readUnsigned(value, idStart + idSize * i, idSize); + int offset = + readUnsigned( + value, offsetStart + offsetSize * i, offsetSize); + int elementPos = dataStart + offset; + String key = getMetadataKey(metadata, id); + int newId = addKey(key); + fields.add(new FieldEntry(key, newId, writePos - start)); + appendVariantImpl(value, metadata, elementPos); + } + finishWritingObject(start, fields); + return null; + }); + break; + case ARRAY: + handleArray( + value, + pos, + (size, offsetSize, offsetStart, dataStart) -> { + ArrayList offsets = new ArrayList<>(size); + int start = writePos; + for (int i = 0; i < size; ++i) { + int offset = + readUnsigned( + value, offsetStart + offsetSize * i, offsetSize); + int elementPos = dataStart + offset; + offsets.add(writePos - start); + appendVariantImpl(value, metadata, elementPos); + } + finishWritingArray(start, offsets); + return null; + }); + break; + default: + shallowAppendVariantImpl(value, pos); + break; + } + } + + // Append the variant value without rewriting or creating any metadata. This is used when + // building an object during shredding, where there is a fixed pre-existing metadata that + // all shredded values will refer to. + public void shallowAppendVariant(GenericVariant v) { + shallowAppendVariantImpl(v.rawValue(), v.pos()); + } + + private void shallowAppendVariantImpl(byte[] value, int pos) { + int size = valueSize(value, pos); + checkIndex(pos + size - 1, value.length); + checkCapacity(size); + System.arraycopy(value, pos, writeBuffer, writePos, size); + writePos += size; + } + + private void checkCapacity(int additional) { + int required = writePos + additional; + if (required > writeBuffer.length) { + // Allocate a new buffer with a capacity of the next power of 2 of `required`. + int newCapacity = Integer.highestOneBit(required); + newCapacity = newCapacity < required ? newCapacity * 2 : newCapacity; + if (newCapacity > SIZE_LIMIT) { + throw new RuntimeException(); + } + byte[] newValue = new byte[newCapacity]; + System.arraycopy(writeBuffer, 0, newValue, 0, writePos); + writeBuffer = newValue; + } + } + + /** + * Temporarily store the information of a field. We need to collect all fields in an JSON + * object, sort them by their keys, and build the variant object in sorted order. + */ + public static final class FieldEntry implements Comparable { + final String key; + final int id; + final int offset; + + public FieldEntry(String key, int id, int offset) { + this.key = key; + this.id = id; + this.offset = offset; + } + + FieldEntry withNewOffset(int newOffset) { + return new FieldEntry(key, id, newOffset); + } + + @Override + public int compareTo(FieldEntry other) { + return key.compareTo(other.key); + } + } + + private void buildJson(JsonParser parser) throws IOException { + JsonToken token = parser.currentToken(); + if (token == null) { + throw new JsonParseException(parser, "Unexpected null token"); + } + switch (token) { + case START_OBJECT: + { + ArrayList fields = new ArrayList<>(); + int start = writePos; + while (parser.nextToken() != JsonToken.END_OBJECT) { + String key = parser.currentName(); + parser.nextToken(); + int id = addKey(key); + fields.add(new FieldEntry(key, id, writePos - start)); + buildJson(parser); + } + finishWritingObject(start, fields); + break; + } + case START_ARRAY: + { + ArrayList offsets = new ArrayList<>(); + int start = writePos; + while (parser.nextToken() != JsonToken.END_ARRAY) { + offsets.add(writePos - start); + buildJson(parser); + } + finishWritingArray(start, offsets); + break; + } + case VALUE_STRING: + appendString(parser.getText()); + break; + case VALUE_NUMBER_INT: + try { + appendLong(parser.getLongValue()); + } catch (InputCoercionException ignored) { + // If the value doesn't fit any integer type, parse it as decimal or floating + // instead. + parseFloatingPoint(parser); + } + break; + case VALUE_NUMBER_FLOAT: + parseFloatingPoint(parser); + break; + case VALUE_TRUE: + appendBoolean(true); + break; + case VALUE_FALSE: + appendBoolean(false); + break; + case VALUE_NULL: + appendNull(); + break; + default: + throw new JsonParseException(parser, "Unexpected token " + token); + } + } + + // Choose the smallest unsigned integer type that can store `value`. It must be within + // `[0, U24_MAX]`. + private int getIntegerSize(int value) { + assert value >= 0 && value <= U24_MAX; + if (value <= U8_MAX) { + return 1; + } + if (value <= U16_MAX) { + return 2; + } + return U24_SIZE; + } + + private void parseFloatingPoint(JsonParser parser) throws IOException { + if (!tryParseDecimal(parser.getText())) { + appendDouble(parser.getDoubleValue()); + } + } + + // Try to parse a JSON number as a decimal. Return whether the parsing succeeds. The input must + // only use the decimal format (an integer value with an optional '.' in it) and must not use + // scientific notation. It also must fit into the precision limitation of decimal types. + private boolean tryParseDecimal(String input) { + for (int i = 0; i < input.length(); ++i) { + char ch = input.charAt(i); + if (ch != '-' && ch != '.' && !(ch >= '0' && ch <= '9')) { + return false; + } + } + BigDecimal d = new BigDecimal(input); + if (d.scale() <= MAX_DECIMAL16_PRECISION && d.precision() <= MAX_DECIMAL16_PRECISION) { + appendDecimal(d); + return true; + } + return false; + } + + // The write buffer in building the variant value. Its first `writePos` bytes has been written. + private byte[] writeBuffer = new byte[128]; + private int writePos = 0; + // Map keys to a monotonically increasing id. + private final HashMap dictionary = new HashMap<>(); + // Store all keys in `dictionary` in the order of id. + private final ArrayList dictionaryKeys = new ArrayList<>(); + private final boolean allowDuplicateKeys; +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantUtil.java b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantUtil.java new file mode 100644 index 000000000000..b37cbd7f6f29 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/GenericVariantUtil.java @@ -0,0 +1,628 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.variant; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Arrays; + +/* This file is based on source code from the Spark Project (http://spark.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +/** + * This class defines constants related to the variant format and provides functions for + * manipulating variant binaries. + * + *

A variant is made up of 2 binaries: value and metadata. A variant value consists of a one-byte + * header and a number of content bytes (can be zero). The header byte is divided into upper 6 bits + * (called "type info") and lower 2 bits (called "basic type"). The content format is explained in + * the below constants for all possible basic type and type info values. + * + *

The variant metadata includes a version id and a dictionary of distinct strings + * (case-sensitive). Its binary format is: - Version: 1-byte unsigned integer. The only acceptable + * value is 1 currently. - Dictionary size: 4-byte little-endian unsigned integer. The number of + * keys in the dictionary. - Offsets: (size + 1) * 4-byte little-endian unsigned integers. + * `offsets[i]` represents the starting position of string i, counting starting from the address of + * `offsets[0]`. Strings must be stored contiguously, so we don’t need to store the string size, + * instead, we compute it with `offset[i + 1] - offset[i]`. - UTF-8 string data. + */ +public class GenericVariantUtil { + public static final int BASIC_TYPE_BITS = 2; + public static final int BASIC_TYPE_MASK = 0x3; + public static final int TYPE_INFO_MASK = 0x3F; + // The inclusive maximum value of the type info value. It is the size limit of `SHORT_STR`. + public static final int MAX_SHORT_STR_SIZE = 0x3F; + + // Below is all possible basic type values. + // Primitive value. The type info value must be one of the values in the below section. + public static final int PRIMITIVE = 0; + // Short string value. The type info value is the string size, which must be in `[0, + // kMaxShortStrSize]`. + // The string content bytes directly follow the header byte. + public static final int SHORT_STR = 1; + // Object value. The content contains a size, a list of field ids, a list of field offsets, and + // the actual field data. The length of the id list is `size`, while the length of the offset + // list is `size + 1`, where the last offset represent the total size of the field data. The + // fields in an object must be sorted by the field name in alphabetical order. Duplicate field + // names in one object are not allowed. + // We use 5 bits in the type info to specify the integer type of the object header: it should + // be 0_b4_b3b2_b1b0 (MSB is 0), where: + // - b4 specifies the type of size. When it is 0/1, `size` is a little-endian 1/4-byte + // unsigned integer. + // - b3b2/b1b0 specifies the integer type of id and offset. When the 2 bits are 0/1/2, the + // list contains 1/2/3-byte little-endian unsigned integers. + public static final int OBJECT = 2; + // Array value. The content contains a size, a list of field offsets, and the actual element + // data. It is similar to an object without the id list. The length of the offset list + // is `size + 1`, where the last offset represent the total size of the element data. + // Its type info should be: 000_b2_b1b0: + // - b2 specifies the type of size. + // - b1b0 specifies the integer type of offset. + public static final int ARRAY = 3; + + // Below is all possible type info values for `PRIMITIVE`. + // JSON Null value. Empty content. + public static final int NULL = 0; + // True value. Empty content. + public static final int TRUE = 1; + // False value. Empty content. + public static final int FALSE = 2; + // 1-byte little-endian signed integer. + public static final int INT1 = 3; + // 2-byte little-endian signed integer. + public static final int INT2 = 4; + // 4-byte little-endian signed integer. + public static final int INT4 = 5; + // 4-byte little-endian signed integer. + public static final int INT8 = 6; + // 8-byte IEEE double. + public static final int DOUBLE = 7; + // 4-byte decimal. Content is 1-byte scale + 4-byte little-endian signed integer. + public static final int DECIMAL4 = 8; + // 8-byte decimal. Content is 1-byte scale + 8-byte little-endian signed integer. + public static final int DECIMAL8 = 9; + // 16-byte decimal. Content is 1-byte scale + 16-byte little-endian signed integer. + public static final int DECIMAL16 = 10; + // Date value. Content is 4-byte little-endian signed integer that represents the number of days + // from the Unix epoch. + public static final int DATE = 11; + // Timestamp value. Content is 8-byte little-endian signed integer that represents the number of + // microseconds elapsed since the Unix epoch, 1970-01-01 00:00:00 UTC. It is displayed to users + // in + // their local time zones and may be displayed differently depending on the execution + // environment. + public static final int TIMESTAMP = 12; + // Timestamp_ntz value. It has the same content as `TIMESTAMP` but should always be interpreted + // as if the local time zone is UTC. + public static final int TIMESTAMP_NTZ = 13; + // 4-byte IEEE float. + public static final int FLOAT = 14; + // Binary value. The content is (4-byte little-endian unsigned integer representing the binary + // size) + (size bytes of binary content). + public static final int BINARY = 15; + // Long string value. The content is (4-byte little-endian unsigned integer representing the + // string size) + (size bytes of string content). + public static final int LONG_STR = 16; + + public static final byte VERSION = 1; + // The lower 4 bits of the first metadata byte contain the version. + public static final byte VERSION_MASK = 0x0F; + + public static final int U8_MAX = 0xFF; + public static final int U16_MAX = 0xFFFF; + public static final int U24_MAX = 0xFFFFFF; + public static final int U24_SIZE = 3; + public static final int U32_SIZE = 4; + + // Both variant value and variant metadata need to be no longer than 16MiB. + public static final int SIZE_LIMIT = U24_MAX + 1; + + public static final int MAX_DECIMAL4_PRECISION = 9; + public static final int MAX_DECIMAL8_PRECISION = 18; + public static final int MAX_DECIMAL16_PRECISION = 38; + + public static final int BINARY_SEARCH_THRESHOLD = 32; + + // Write the least significant `numBytes` bytes in `value` into `bytes[pos, pos + numBytes)` in + // little endian. + public static void writeLong(byte[] bytes, int pos, long value, int numBytes) { + for (int i = 0; i < numBytes; ++i) { + bytes[pos + i] = (byte) ((value >>> (8 * i)) & 0xFF); + } + } + + public static byte primitiveHeader(int type) { + return (byte) (type << 2 | PRIMITIVE); + } + + public static byte shortStrHeader(int size) { + return (byte) (size << 2 | SHORT_STR); + } + + public static byte objectHeader(boolean largeSize, int idSize, int offsetSize) { + return (byte) + (((largeSize ? 1 : 0) << (BASIC_TYPE_BITS + 4)) + | ((idSize - 1) << (BASIC_TYPE_BITS + 2)) + | ((offsetSize - 1) << BASIC_TYPE_BITS) + | OBJECT); + } + + public static byte arrayHeader(boolean largeSize, int offsetSize) { + return (byte) + (((largeSize ? 1 : 0) << (BASIC_TYPE_BITS + 2)) + | ((offsetSize - 1) << BASIC_TYPE_BITS) + | ARRAY); + } + + // An exception indicating that the variant value or metadata doesn't + static RuntimeException malformedVariant() { + return new RuntimeException("MALFORMED_VARIANT"); + } + + static RuntimeException unknownPrimitiveTypeInVariant(int id) { + return new RuntimeException("UNKNOWN_PRIMITIVE_TYPE_IN_VARIANT, id: " + id); + } + + // An exception indicating that an external caller tried to call the Variant constructor with + // value or metadata exceeding the 16MiB size limit. We will never construct a Variant this + // large, + // so it should only be possible to encounter this exception when reading a Variant produced by + // another tool. + static RuntimeException variantConstructorSizeLimit() { + return new RuntimeException("VARIANT_CONSTRUCTOR_SIZE_LIMIT"); + } + + // Check the validity of an array index `pos`. Throw `MALFORMED_VARIANT` if it is out of bound, + // meaning that the variant is malformed. + static void checkIndex(int pos, int length) { + if (pos < 0 || pos >= length) { + throw malformedVariant(); + } + } + + // Read a little-endian signed long value from `bytes[pos, pos + numBytes)`. + static long readLong(byte[] bytes, int pos, int numBytes) { + checkIndex(pos, bytes.length); + checkIndex(pos + numBytes - 1, bytes.length); + long result = 0; + // All bytes except the most significant byte should be unsign-extended and shifted (so we + // need `& 0xFF`). The most significant byte should be sign-extended and is handled after + // the loop. + for (int i = 0; i < numBytes - 1; ++i) { + long unsignedByteValue = bytes[pos + i] & 0xFF; + result |= unsignedByteValue << (8 * i); + } + long signedByteValue = bytes[pos + numBytes - 1]; + result |= signedByteValue << (8 * (numBytes - 1)); + return result; + } + + // Read a little-endian unsigned int value from `bytes[pos, pos + numBytes)`. The value must fit + // into a non-negative int (`[0, Integer.MAX_VALUE]`). + static int readUnsigned(byte[] bytes, int pos, int numBytes) { + checkIndex(pos, bytes.length); + checkIndex(pos + numBytes - 1, bytes.length); + int result = 0; + // Similar to the `readLong` loop, but all bytes should be unsign-extended. + for (int i = 0; i < numBytes; ++i) { + int unsignedByteValue = bytes[pos + i] & 0xFF; + result |= unsignedByteValue << (8 * i); + } + if (result < 0) { + throw malformedVariant(); + } + return result; + } + + /** + * The value type of variant value. It is determined by the header byte but not a 1:1 mapping + * (for example, INT1/2/4/8 all maps to `Type.LONG`). + */ + public enum Type { + OBJECT, + ARRAY, + NULL, + BOOLEAN, + LONG, + STRING, + DOUBLE, + DECIMAL, + DATE, + TIMESTAMP, + TIMESTAMP_NTZ, + FLOAT, + BINARY + } + + public static int getTypeInfo(byte[] value, int pos) { + checkIndex(pos, value.length); + return (value[pos] >> BASIC_TYPE_BITS) & TYPE_INFO_MASK; + } + + // Get the value type of variant value `value[pos...]`. It is only legal to call `get*` if + // `getType` returns this type (for example, it is only legal to call `getLong` if `getType` + // returns `Type.Long`). + // Throw `MALFORMED_VARIANT` if the variant is malformed. + public static Type getType(byte[] value, int pos) { + checkIndex(pos, value.length); + int basicType = value[pos] & BASIC_TYPE_MASK; + int typeInfo = (value[pos] >> BASIC_TYPE_BITS) & TYPE_INFO_MASK; + switch (basicType) { + case SHORT_STR: + return Type.STRING; + case OBJECT: + return Type.OBJECT; + case ARRAY: + return Type.ARRAY; + default: + switch (typeInfo) { + case NULL: + return Type.NULL; + case TRUE: + case FALSE: + return Type.BOOLEAN; + case INT1: + case INT2: + case INT4: + case INT8: + return Type.LONG; + case DOUBLE: + return Type.DOUBLE; + case DECIMAL4: + case DECIMAL8: + case DECIMAL16: + return Type.DECIMAL; + case DATE: + return Type.DATE; + case TIMESTAMP: + return Type.TIMESTAMP; + case TIMESTAMP_NTZ: + return Type.TIMESTAMP_NTZ; + case FLOAT: + return Type.FLOAT; + case BINARY: + return Type.BINARY; + case LONG_STR: + return Type.STRING; + default: + throw unknownPrimitiveTypeInVariant(typeInfo); + } + } + } + + // Compute the size in bytes of the variant value `value[pos...]`. `value.length - pos` is an + // upper bound of the size, but the actual size can be smaller. + // Throw `MALFORMED_VARIANT` if the variant is malformed. + public static int valueSize(byte[] value, int pos) { + checkIndex(pos, value.length); + int basicType = value[pos] & BASIC_TYPE_MASK; + int typeInfo = (value[pos] >> BASIC_TYPE_BITS) & TYPE_INFO_MASK; + switch (basicType) { + case SHORT_STR: + return 1 + typeInfo; + case OBJECT: + return handleObject( + value, + pos, + (size, idSize, offsetSize, idStart, offsetStart, dataStart) -> + dataStart + - pos + + readUnsigned( + value, + offsetStart + size * offsetSize, + offsetSize)); + case ARRAY: + return handleArray( + value, + pos, + (size, offsetSize, offsetStart, dataStart) -> + dataStart + - pos + + readUnsigned( + value, + offsetStart + size * offsetSize, + offsetSize)); + default: + switch (typeInfo) { + case NULL: + case TRUE: + case FALSE: + return 1; + case INT1: + return 2; + case INT2: + return 3; + case INT4: + case DATE: + case FLOAT: + return 5; + case INT8: + case DOUBLE: + case TIMESTAMP: + case TIMESTAMP_NTZ: + return 9; + case DECIMAL4: + return 6; + case DECIMAL8: + return 10; + case DECIMAL16: + return 18; + case BINARY: + case LONG_STR: + return 1 + U32_SIZE + readUnsigned(value, pos + 1, U32_SIZE); + default: + throw unknownPrimitiveTypeInVariant(typeInfo); + } + } + } + + static IllegalStateException unexpectedType(Type type) { + return new IllegalStateException("Expect type to be " + type); + } + + // Get a boolean value from variant value `value[pos...]`. + // Throw `MALFORMED_VARIANT` if the variant is malformed. + public static boolean getBoolean(byte[] value, int pos) { + checkIndex(pos, value.length); + int basicType = value[pos] & BASIC_TYPE_MASK; + int typeInfo = (value[pos] >> BASIC_TYPE_BITS) & TYPE_INFO_MASK; + if (basicType != PRIMITIVE || (typeInfo != TRUE && typeInfo != FALSE)) { + throw unexpectedType(Type.BOOLEAN); + } + return typeInfo == TRUE; + } + + // Get a long value from variant value `value[pos...]`. + // It is only legal to call it if `getType` returns one of `Type.LONG/DATE/TIMESTAMP/ + // TIMESTAMP_NTZ`. If the type is `DATE`, the return value is guaranteed to fit into an int and + // represents the number of days from the Unix epoch. + // If the type is `TIMESTAMP/TIMESTAMP_NTZ`, the return value represents the number of + // microseconds from the Unix epoch. + public static long getLong(byte[] value, int pos) { + checkIndex(pos, value.length); + int basicType = value[pos] & BASIC_TYPE_MASK; + int typeInfo = (value[pos] >> BASIC_TYPE_BITS) & TYPE_INFO_MASK; + String exceptionMessage = "Expect type to be LONG/DATE/TIMESTAMP/TIMESTAMP_NTZ"; + if (basicType != PRIMITIVE) { + throw new IllegalStateException(exceptionMessage); + } + switch (typeInfo) { + case INT1: + return readLong(value, pos + 1, 1); + case INT2: + return readLong(value, pos + 1, 2); + case INT4: + case DATE: + return readLong(value, pos + 1, 4); + case INT8: + case TIMESTAMP: + case TIMESTAMP_NTZ: + return readLong(value, pos + 1, 8); + default: + throw new IllegalStateException(exceptionMessage); + } + } + + // Get a double value from variant value `value[pos...]`. + // Throw `MALFORMED_VARIANT` if the variant is malformed. + public static double getDouble(byte[] value, int pos) { + checkIndex(pos, value.length); + int basicType = value[pos] & BASIC_TYPE_MASK; + int typeInfo = (value[pos] >> BASIC_TYPE_BITS) & TYPE_INFO_MASK; + if (basicType != PRIMITIVE || typeInfo != DOUBLE) { + throw unexpectedType(Type.DOUBLE); + } + return Double.longBitsToDouble(readLong(value, pos + 1, 8)); + } + + // Check whether the precision and scale of the decimal are within the limit. + private static void checkDecimal(BigDecimal d, int maxPrecision) { + if (d.precision() > maxPrecision || d.scale() > maxPrecision) { + throw malformedVariant(); + } + } + + // Get a decimal value from variant value `value[pos...]`. + // Throw `MALFORMED_VARIANT` if the variant is malformed. + public static BigDecimal getDecimalWithOriginalScale(byte[] value, int pos) { + checkIndex(pos, value.length); + int basicType = value[pos] & BASIC_TYPE_MASK; + int typeInfo = (value[pos] >> BASIC_TYPE_BITS) & TYPE_INFO_MASK; + if (basicType != PRIMITIVE) { + throw unexpectedType(Type.DECIMAL); + } + // Interpret the scale byte as unsigned. If it is a negative byte, the unsigned value must + // be greater than `MAX_DECIMAL16_PRECISION` and will trigger an error in `checkDecimal`. + int scale = value[pos + 1] & 0xFF; + BigDecimal result; + switch (typeInfo) { + case DECIMAL4: + result = BigDecimal.valueOf(readLong(value, pos + 2, 4), scale); + checkDecimal(result, MAX_DECIMAL4_PRECISION); + break; + case DECIMAL8: + result = BigDecimal.valueOf(readLong(value, pos + 2, 8), scale); + checkDecimal(result, MAX_DECIMAL8_PRECISION); + break; + case DECIMAL16: + checkIndex(pos + 17, value.length); + byte[] bytes = new byte[16]; + // Copy the bytes reversely because the `BigInteger` constructor expects a + // big-endian representation. + for (int i = 0; i < 16; ++i) { + bytes[i] = value[pos + 17 - i]; + } + result = new BigDecimal(new BigInteger(bytes), scale); + checkDecimal(result, MAX_DECIMAL16_PRECISION); + break; + default: + throw unexpectedType(Type.DECIMAL); + } + return result; + } + + public static BigDecimal getDecimal(byte[] value, int pos) { + return getDecimalWithOriginalScale(value, pos).stripTrailingZeros(); + } + + // Get a float value from variant value `value[pos...]`. + // Throw `MALFORMED_VARIANT` if the variant is malformed. + public static float getFloat(byte[] value, int pos) { + checkIndex(pos, value.length); + int basicType = value[pos] & BASIC_TYPE_MASK; + int typeInfo = (value[pos] >> BASIC_TYPE_BITS) & TYPE_INFO_MASK; + if (basicType != PRIMITIVE || typeInfo != FLOAT) { + throw unexpectedType(Type.FLOAT); + } + return Float.intBitsToFloat((int) readLong(value, pos + 1, 4)); + } + + // Get a binary value from variant value `value[pos...]`. + // Throw `MALFORMED_VARIANT` if the variant is malformed. + public static byte[] getBinary(byte[] value, int pos) { + checkIndex(pos, value.length); + int basicType = value[pos] & BASIC_TYPE_MASK; + int typeInfo = (value[pos] >> BASIC_TYPE_BITS) & TYPE_INFO_MASK; + if (basicType != PRIMITIVE || typeInfo != BINARY) { + throw unexpectedType(Type.BINARY); + } + int start = pos + 1 + U32_SIZE; + int length = readUnsigned(value, pos + 1, U32_SIZE); + checkIndex(start + length - 1, value.length); + return Arrays.copyOfRange(value, start, start + length); + } + + // Get a string value from variant value `value[pos...]`. + // Throw `MALFORMED_VARIANT` if the variant is malformed. + public static String getString(byte[] value, int pos) { + checkIndex(pos, value.length); + int basicType = value[pos] & BASIC_TYPE_MASK; + int typeInfo = (value[pos] >> BASIC_TYPE_BITS) & TYPE_INFO_MASK; + if (basicType == SHORT_STR || (basicType == PRIMITIVE && typeInfo == LONG_STR)) { + int start; + int length; + if (basicType == SHORT_STR) { + start = pos + 1; + length = typeInfo; + } else { + start = pos + 1 + U32_SIZE; + length = readUnsigned(value, pos + 1, U32_SIZE); + } + checkIndex(start + length - 1, value.length); + return new String(value, start, length); + } + throw unexpectedType(Type.STRING); + } + + /** 1. */ + public interface ObjectHandler { + /** + * @param size Number of object fields. + * @param idSize The integer size of the field id list. + * @param offsetSize The integer size of the offset list. + * @param idStart The starting index of the field id list in the variant value array. + * @param offsetStart The starting index of the offset list in the variant value array. + * @param dataStart The starting index of field data in the variant value array. + */ + T apply(int size, int idSize, int offsetSize, int idStart, int offsetStart, int dataStart); + } + + // A helper function to access a variant object. It provides `handler` with its required + // parameters and returns what it returns. + public static T handleObject(byte[] value, int pos, ObjectHandler handler) { + checkIndex(pos, value.length); + int basicType = value[pos] & BASIC_TYPE_MASK; + int typeInfo = (value[pos] >> BASIC_TYPE_BITS) & TYPE_INFO_MASK; + if (basicType != OBJECT) { + throw unexpectedType(Type.OBJECT); + } + // Refer to the comment of the `OBJECT` constant for the details of the object header + // encoding. Suppose `typeInfo` has a bit representation of 0_b4_b3b2_b1b0, the following + // line extracts b4 to determine whether the object uses a 1/4-byte size. + boolean largeSize = ((typeInfo >> 4) & 0x1) != 0; + int sizeBytes = (largeSize ? U32_SIZE : 1); + int size = readUnsigned(value, pos + 1, sizeBytes); + // Extracts b3b2 to determine the integer size of the field id list. + int idSize = ((typeInfo >> 2) & 0x3) + 1; + // Extracts b1b0 to determine the integer size of the offset list. + int offsetSize = (typeInfo & 0x3) + 1; + int idStart = pos + 1 + sizeBytes; + int offsetStart = idStart + size * idSize; + int dataStart = offsetStart + (size + 1) * offsetSize; + return handler.apply(size, idSize, offsetSize, idStart, offsetStart, dataStart); + } + + /** 1. */ + public interface ArrayHandler { + /** + * @param size Number of array elements. + * @param offsetSize The integer size of the offset list. + * @param offsetStart The starting index of the offset list in the variant value array. + * @param dataStart The starting index of element data in the variant value array. + */ + T apply(int size, int offsetSize, int offsetStart, int dataStart); + } + + // A helper function to access a variant array. + public static T handleArray(byte[] value, int pos, ArrayHandler handler) { + checkIndex(pos, value.length); + int basicType = value[pos] & BASIC_TYPE_MASK; + int typeInfo = (value[pos] >> BASIC_TYPE_BITS) & TYPE_INFO_MASK; + if (basicType != ARRAY) { + throw unexpectedType(Type.ARRAY); + } + // Refer to the comment of the `ARRAY` constant for the details of the object header + // encoding. + // Suppose `typeInfo` has a bit representation of 000_b2_b1b0, the following line extracts + // b2 to determine whether the object uses a 1/4-byte size. + boolean largeSize = ((typeInfo >> 2) & 0x1) != 0; + int sizeBytes = (largeSize ? U32_SIZE : 1); + int size = readUnsigned(value, pos + 1, sizeBytes); + // Extracts b1b0 to determine the integer size of the offset list. + int offsetSize = (typeInfo & 0x3) + 1; + int offsetStart = pos + 1 + sizeBytes; + int dataStart = offsetStart + (size + 1) * offsetSize; + return handler.apply(size, offsetSize, offsetStart, dataStart); + } + + // Get a key at `id` in the variant metadata. + // Throw `MALFORMED_VARIANT` if the variant is malformed. An out-of-bound `id` is also + // considered a malformed variant because it is read from the corresponding variant value. + public static String getMetadataKey(byte[] metadata, int id) { + checkIndex(0, metadata.length); + // Extracts the highest 2 bits in the metadata header to determine the integer size of the + // offset list. + int offsetSize = ((metadata[0] >> 6) & 0x3) + 1; + int dictSize = readUnsigned(metadata, 1, offsetSize); + if (id >= dictSize) { + throw malformedVariant(); + } + // There are a header byte, a `dictSize` with `offsetSize` bytes, and `(dictSize + 1)` + // offsets before the string data. + int stringStart = 1 + (dictSize + 2) * offsetSize; + int offset = readUnsigned(metadata, 1 + (id + 1) * offsetSize, offsetSize); + int nextOffset = readUnsigned(metadata, 1 + (id + 2) * offsetSize, offsetSize); + if (offset > nextOffset) { + throw malformedVariant(); + } + checkIndex(stringStart + nextOffset - 1, metadata.length); + return new String(metadata, stringStart + offset, nextOffset - offset); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/variant/PathSegment.java b/paimon-common/src/main/java/org/apache/paimon/data/variant/PathSegment.java new file mode 100644 index 000000000000..5804fb9fcb3e --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/PathSegment.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.variant; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A path segment for variant get to represent either an object key access or an array index access. + */ +public class PathSegment { + private final String key; + private final Integer index; + + private PathSegment(String key, Integer index) { + this.key = key; + this.index = index; + } + + public static PathSegment createKeySegment(String key) { + return new PathSegment(key, null); + } + + public static PathSegment createIndexSegment(int index) { + return new PathSegment(null, index); + } + + public boolean isKey() { + return key != null; + } + + public boolean isIndex() { + return index != null; + } + + public String getKey() { + return key; + } + + public Integer getIndex() { + return index; + } + + private static final Pattern ROOT_PATTERN = Pattern.compile("\\$"); + // Parse index segment like `[123]`. + private static final Pattern INDEX_PATTERN = Pattern.compile("\\[(\\d+)]"); + // Parse key segment like `.name` or `['name']` or `["name"]`. + private static final Pattern KEY_PATTERN = + Pattern.compile("\\.([^.\\[]+)|\\['([^']+)']|\\[\"([^\"]+)\"]"); + + public static PathSegment[] parse(String str) { + // Validate root + Matcher rootMatcher = ROOT_PATTERN.matcher(str); + if (str.isEmpty() || !rootMatcher.find()) { + throw new IllegalArgumentException("Invalid path: " + str); + } + + List segments = new ArrayList<>(); + String remaining = str.substring(rootMatcher.end()); + // Parse indexes and keys + while (!remaining.isEmpty()) { + Matcher indexMatcher = INDEX_PATTERN.matcher(remaining); + if (indexMatcher.lookingAt()) { + int index = Integer.parseInt(indexMatcher.group(1)); + segments.add(PathSegment.createIndexSegment(index)); + remaining = remaining.substring(indexMatcher.end()); + continue; + } + + Matcher keyMatcher = KEY_PATTERN.matcher(remaining); + if (keyMatcher.lookingAt()) { + for (int i = 1; i <= 3; i++) { + if (keyMatcher.group(i) != null) { + segments.add(PathSegment.createKeySegment(keyMatcher.group(i))); + break; + } + } + remaining = remaining.substring(keyMatcher.end()); + continue; + } + throw new IllegalArgumentException("Invalid path: " + str); + } + + return segments.toArray(new PathSegment[0]); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java b/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java new file mode 100644 index 000000000000..bfecfd573ad7 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.variant; + +/** + * A Variant represents a type that contain one of: 1) Primitive: A type and corresponding value + * (e.g. INT, STRING); 2) Array: An ordered list of Variant values; 3) Object: An unordered + * collection of string/Variant pairs (i.e. key/value pairs). An object may not contain duplicate + * keys. + * + *

A Variant is encoded with 2 binary values, the value and the metadata. + * + *

The Variant Binary Encoding allows representation of semi-structured data (e.g. JSON) in a + * form that can be efficiently queried by path. The design is intended to allow efficient access to + * nested data even in the presence of very wide or deep structures. + */ +public interface Variant { + + /** Returns the variant metadata. */ + byte[] metadata(); + + /** Returns the variant value. */ + byte[] value(); + + /** Parses the variant to json. */ + String toJson(); + + /** + * Extracts a sub-variant value according to a path which start with a `$`. e.g. + * + *

access object's field: `$.key` or `$['key']` or `$["key"]`. + * + *

access array's first elem: `$.array[0]` + */ + Object variantGet(String path); +} diff --git a/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java b/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java new file mode 100644 index 000000000000..dbe9bdf3c1ba --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/data/variant/GenericVariantTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.variant; + +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; + +import static org.apache.paimon.types.DataTypesTest.assertThat; + +/** Test of {@link GenericVariant}. */ +public class GenericVariantTest { + + @Test + public void testToJson() { + String json = + "{\n" + + " \"object\": {\n" + + " \"name\": \"Apache Paimon\",\n" + + " \"age\": 2,\n" + + " \"address\": {\n" + + " \"street\": \"Main St\",\n" + + " \"city\": \"Hangzhou\"\n" + + " }\n" + + " },\n" + + " \"array\": [1, 2, 3, 4, 5],\n" + + " \"string\": \"Hello, World!\",\n" + + " \"long\": 12345678901234,\n" + + " \"double\": 1.0123456789012345678901234567890123456789,\n" + + " \"decimal\": 100.99,\n" + + " \"boolean1\": true,\n" + + " \"boolean2\": false,\n" + + " \"nullField\": null\n" + + "}\n"; + + assertThat(GenericVariant.fromJson(json).toJson()) + .isEqualTo( + "{\"array\":[1,2,3,4,5],\"boolean1\":true,\"boolean2\":false,\"decimal\":100.99,\"double\":1.0123456789012346,\"long\":12345678901234,\"nullField\":null,\"object\":{\"address\":{\"city\":\"Hangzhou\",\"street\":\"Main St\"},\"age\":2,\"name\":\"Apache Paimon\"},\"string\":\"Hello, World!\"}"); + } + + @Test + public void testVariantGet() { + String json = + "{\n" + + " \"object\": {\n" + + " \"name\": \"Apache Paimon\",\n" + + " \"age\": 2,\n" + + " \"address\": {\n" + + " \"street\": \"Main St\",\n" + + " \"city\": \"Hangzhou\"\n" + + " }\n" + + " },\n" + + " \"array\": [1, 2, 3, 4, 5],\n" + + " \"string\": \"Hello, World!\",\n" + + " \"long\": 12345678901234,\n" + + " \"double\": 1.0123456789012345678901234567890123456789,\n" + + " \"decimal\": 100.99,\n" + + " \"boolean1\": true,\n" + + " \"boolean2\": false,\n" + + " \"nullField\": null\n" + + "}\n"; + + Variant variant = GenericVariant.fromJson(json); + assertThat(variant.variantGet("$.object")) + .isEqualTo( + "{\"address\":{\"city\":\"Hangzhou\",\"street\":\"Main St\"},\"age\":2,\"name\":\"Apache Paimon\"}"); + assertThat(variant.variantGet("$.object.name")).isEqualTo("Apache Paimon"); + assertThat(variant.variantGet("$.object.address.street")).isEqualTo("Main St"); + assertThat(variant.variantGet("$[\"object\"]['address'].city")).isEqualTo("Hangzhou"); + assertThat(variant.variantGet("$.array")).isEqualTo("[1,2,3,4,5]"); + assertThat(variant.variantGet("$.array[0]")).isEqualTo(1L); + assertThat(variant.variantGet("$.array[3]")).isEqualTo(4L); + assertThat(variant.variantGet("$.string")).isEqualTo("Hello, World!"); + assertThat(variant.variantGet("$.long")).isEqualTo(12345678901234L); + assertThat(variant.variantGet("$.double")) + .isEqualTo(1.0123456789012345678901234567890123456789); + assertThat(variant.variantGet("$.decimal")).isEqualTo(new BigDecimal("100.99")); + assertThat(variant.variantGet("$.boolean1")).isEqualTo(true); + assertThat(variant.variantGet("$.boolean2")).isEqualTo(false); + assertThat(variant.variantGet("$.nullField")).isNull(); + } +}