From 9873457df6cded72603eb434746fc3ccfca5bd90 Mon Sep 17 00:00:00 2001 From: "ranxianglei.rxl" Date: Wed, 21 Aug 2024 14:44:29 +0800 Subject: [PATCH 01/27] [core] fix hll class not found --- .../java/org/apache/paimon/utils/HllSketchUtil.java | 10 ++++++++++ .../compact/aggregate/FieldHllSketchAgg.java | 11 ++--------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/HllSketchUtil.java b/paimon-common/src/main/java/org/apache/paimon/utils/HllSketchUtil.java index 34c5464f7106..609862dafc20 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/HllSketchUtil.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/HllSketchUtil.java @@ -21,10 +21,20 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.datasketches.hll.HllSketch; +import org.apache.datasketches.hll.TgtHllType; +import org.apache.datasketches.hll.Union; /** A compressed bitmap for 32-bit integer. */ public class HllSketchUtil { + public static byte[] union(byte[] sketchBytes1, byte[] sketchBytes2) { + HllSketch heapify = HllSketch.heapify((byte[]) sketchBytes1); + org.apache.datasketches.hll.Union union = Union.heapify((byte[]) sketchBytes2); + union.update(heapify); + HllSketch result = union.getResult(TgtHllType.HLL_4); + return result.toCompactByteArray(); + } + @VisibleForTesting public static byte[] sketchOf(int... values) { HllSketch hllSketch = new HllSketch(); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java index 93901753645a..0ccf4af6497c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java @@ -19,10 +19,7 @@ package org.apache.paimon.mergetree.compact.aggregate; import org.apache.paimon.types.VarBinaryType; - -import org.apache.datasketches.hll.HllSketch; -import org.apache.datasketches.hll.TgtHllType; -import org.apache.datasketches.hll.Union; +import org.apache.paimon.utils.HllSketchUtil; /** HllSketch aggregate a field of a row. */ public class FieldHllSketchAgg extends FieldAggregator { @@ -50,10 +47,6 @@ public Object agg(Object accumulator, Object inputField) { return accumulator == null ? inputField : accumulator; } - HllSketch heapify = HllSketch.heapify((byte[]) accumulator); - Union union = Union.heapify((byte[]) inputField); - union.update(heapify); - HllSketch result = union.getResult(TgtHllType.HLL_4); - return result.toCompactByteArray(); + return HllSketchUtil.union((byte[]) accumulator, (byte[]) inputField); } } From dedcaa4933759e2b41036cd237c5263ab56e450d Mon Sep 17 00:00:00 2001 From: "ranxianglei.rxl" Date: Sun, 22 Sep 2024 20:16:35 +0800 Subject: [PATCH 02/27] [format][orc] open orc switch useSelected,allowSARGToFilter to make sure pushdown works --- .../paimon/format/orc/OrcFileFormat.java | 1 - .../paimon/format/orc/OrcReaderFactory.java | 14 ++++++--- .../orc/reader/AbstractOrcColumnVector.java | 31 ++++++++++++------- .../orc/reader/OrcArrayColumnVector.java | 7 +++-- .../orc/reader/OrcBytesColumnVector.java | 7 +++-- .../orc/reader/OrcDecimalColumnVector.java | 5 +-- .../orc/reader/OrcDoubleColumnVector.java | 6 ++-- .../OrcLegacyTimestampColumnVector.java | 5 +-- .../orc/reader/OrcLongColumnVector.java | 9 ++++-- .../format/orc/reader/OrcMapColumnVector.java | 10 +++--- .../format/orc/reader/OrcRowColumnVector.java | 8 +++-- .../orc/reader/OrcTimestampColumnVector.java | 5 +-- 12 files changed, 68 insertions(+), 40 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java index de28487b715f..fdff90563440 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java @@ -99,7 +99,6 @@ public Optional createStatsExtractor( public FormatReaderFactory createReaderFactory( RowType projectedRowType, @Nullable List filters) { List orcPredicates = new ArrayList<>(); - if (filters != null) { for (Predicate pred : filters) { Optional orcPred = diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 5093a5010773..3d583333ad0d 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -123,7 +123,10 @@ public OrcReaderBatch createReaderBatch( for (int i = 0; i < vectors.length; i++) { String name = tableFieldNames.get(i); DataType type = tableFieldTypes.get(i); - vectors[i] = createPaimonVector(orcBatch.cols[tableFieldNames.indexOf(name)], type); + int[] selected = orcBatch.getSelected(); + vectors[i] = + createPaimonVector( + orcBatch.cols[tableFieldNames.indexOf(name)], selected, type); } return new OrcReaderBatch(filePath, orcBatch, new VectorizedColumnBatch(vectors), recycler); } @@ -265,10 +268,11 @@ private static RecordReader createRecordReader( .schema(schema) .range(offsetAndLength.getLeft(), offsetAndLength.getRight()) .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf)) - .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf)) - .tolerateMissingSchema( - OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf)); - + .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf)); + if (!conjunctPredicates.isEmpty()) { + options.useSelected(true); + options.allowSARGToFilter(true); + } // configure filters if (!conjunctPredicates.isEmpty()) { SearchArgument.Builder b = SearchArgumentFactory.newBuilder(); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java index 21154c4967b7..5b612005f1c9 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java @@ -40,8 +40,15 @@ public abstract class AbstractOrcColumnVector private final ColumnVector vector; - AbstractOrcColumnVector(ColumnVector vector) { + private final int[] selected; + + AbstractOrcColumnVector(ColumnVector vector, int[] selected) { this.vector = vector; + this.selected = selected; + } + + protected int rowMapper(int r) { + return selected[r]; } @Override @@ -50,27 +57,29 @@ public boolean isNullAt(int i) { } public static org.apache.paimon.data.columnar.ColumnVector createPaimonVector( - ColumnVector vector, DataType dataType) { + ColumnVector vector, int[] selected, DataType dataType) { if (vector instanceof LongColumnVector) { if (dataType.getTypeRoot() == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) { - return new OrcLegacyTimestampColumnVector((LongColumnVector) vector); + return new OrcLegacyTimestampColumnVector((LongColumnVector) vector, selected); } else { - return new OrcLongColumnVector((LongColumnVector) vector); + return new OrcLongColumnVector((LongColumnVector) vector, selected); } } else if (vector instanceof DoubleColumnVector) { - return new OrcDoubleColumnVector((DoubleColumnVector) vector); + return new OrcDoubleColumnVector((DoubleColumnVector) vector, selected); } else if (vector instanceof BytesColumnVector) { - return new OrcBytesColumnVector((BytesColumnVector) vector); + return new OrcBytesColumnVector((BytesColumnVector) vector, selected); } else if (vector instanceof DecimalColumnVector) { - return new OrcDecimalColumnVector((DecimalColumnVector) vector); + return new OrcDecimalColumnVector((DecimalColumnVector) vector, selected); } else if (vector instanceof TimestampColumnVector) { - return new OrcTimestampColumnVector(vector); + return new OrcTimestampColumnVector(vector, selected); } else if (vector instanceof ListColumnVector) { - return new OrcArrayColumnVector((ListColumnVector) vector, (ArrayType) dataType); + return new OrcArrayColumnVector( + (ListColumnVector) vector, selected, (ArrayType) dataType); } else if (vector instanceof StructColumnVector) { - return new OrcRowColumnVector((StructColumnVector) vector, (RowType) dataType); + return new OrcRowColumnVector( + (StructColumnVector) vector, selected, (RowType) dataType); } else if (vector instanceof MapColumnVector) { - return new OrcMapColumnVector((MapColumnVector) vector, (MapType) dataType); + return new OrcMapColumnVector((MapColumnVector) vector, selected, (MapType) dataType); } else { throw new UnsupportedOperationException( "Unsupported vector: " + vector.getClass().getName()); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java index ed16a0b51084..ade154221835 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java @@ -32,14 +32,15 @@ public class OrcArrayColumnVector extends AbstractOrcColumnVector private final ListColumnVector hiveVector; private final ColumnVector paimonVector; - public OrcArrayColumnVector(ListColumnVector hiveVector, ArrayType type) { - super(hiveVector); + public OrcArrayColumnVector(ListColumnVector hiveVector, int[] selected, ArrayType type) { + super(hiveVector, selected); this.hiveVector = hiveVector; - this.paimonVector = createPaimonVector(hiveVector.child, type.getElementType()); + this.paimonVector = createPaimonVector(hiveVector.child, selected, type.getElementType()); } @Override public InternalArray getArray(int i) { + i = rowMapper(i); long offset = hiveVector.offsets[i]; long length = hiveVector.lengths[i]; return new ColumnarArray(paimonVector, (int) offset, (int) length); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java index d48bad886a47..a2664f3b8e45 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java @@ -26,17 +26,18 @@ public class OrcBytesColumnVector extends AbstractOrcColumnVector private final BytesColumnVector vector; - public OrcBytesColumnVector(BytesColumnVector vector) { - super(vector); + public OrcBytesColumnVector(BytesColumnVector vector, int[] selected) { + super(vector, selected); this.vector = vector; } @Override public Bytes getBytes(int i) { int rowId = vector.isRepeating ? 0 : i; + int selectedRowId = rowMapper(rowId); byte[][] data = vector.vector; int[] start = vector.start; int[] length = vector.length; - return new Bytes(data[rowId], start[rowId], length[rowId]); + return new Bytes(data[selectedRowId], start[selectedRowId], length[selectedRowId]); } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java index 9ea4d763a5d8..30963bb29fee 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java @@ -32,13 +32,14 @@ public class OrcDecimalColumnVector extends AbstractOrcColumnVector private final DecimalColumnVector vector; - public OrcDecimalColumnVector(DecimalColumnVector vector) { - super(vector); + public OrcDecimalColumnVector(DecimalColumnVector vector, int[] selected) { + super(vector, selected); this.vector = vector; } @Override public Decimal getDecimal(int i, int precision, int scale) { + i = rowMapper(i); BigDecimal data = vector.vector[vector.isRepeating ? 0 : i].getHiveDecimal().bigDecimalValue(); return Decimal.fromBigDecimal(data, precision, scale); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java index 0c0b0cc51d38..0353e4aea9f4 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java @@ -30,18 +30,20 @@ public class OrcDoubleColumnVector extends AbstractOrcColumnVector private final DoubleColumnVector vector; - public OrcDoubleColumnVector(DoubleColumnVector vector) { - super(vector); + public OrcDoubleColumnVector(DoubleColumnVector vector, int[] selected) { + super(vector, selected); this.vector = vector; } @Override public double getDouble(int i) { + i = rowMapper(i); return vector.vector[vector.isRepeating ? 0 : i]; } @Override public float getFloat(int i) { + i = rowMapper(i); return (float) vector.vector[vector.isRepeating ? 0 : i]; } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java index 18227ecf3dd2..508895632374 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java @@ -34,13 +34,14 @@ public class OrcLegacyTimestampColumnVector extends AbstractOrcColumnVector private final LongColumnVector hiveVector; - OrcLegacyTimestampColumnVector(LongColumnVector vector) { - super(vector); + OrcLegacyTimestampColumnVector(LongColumnVector vector, int[] selected) { + super(vector, selected); this.hiveVector = vector; } @Override public Timestamp getTimestamp(int i, int precision) { + i = rowMapper(i); int index = hiveVector.isRepeating ? 0 : i; java.sql.Timestamp timestamp = toTimestamp(hiveVector.vector[index]); return Timestamp.fromSQLTimestamp(timestamp); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java index e7dfe0e6134e..96f6922029a3 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java @@ -33,33 +33,38 @@ public class OrcLongColumnVector extends AbstractOrcColumnVector private final LongColumnVector vector; - public OrcLongColumnVector(LongColumnVector vector) { - super(vector); + public OrcLongColumnVector(LongColumnVector vector, int[] selected) { + super(vector, selected); this.vector = vector; } @Override public long getLong(int i) { + i = rowMapper(i); return vector.vector[vector.isRepeating ? 0 : i]; } @Override public boolean getBoolean(int i) { + i = rowMapper(i); return vector.vector[vector.isRepeating ? 0 : i] == 1; } @Override public byte getByte(int i) { + i = rowMapper(i); return (byte) vector.vector[vector.isRepeating ? 0 : i]; } @Override public int getInt(int i) { + i = rowMapper(i); return (int) vector.vector[vector.isRepeating ? 0 : i]; } @Override public short getShort(int i) { + i = rowMapper(i); return (short) vector.vector[vector.isRepeating ? 0 : i]; } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java index 66a1af6dccf4..e45aea60b59c 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java @@ -33,15 +33,17 @@ public class OrcMapColumnVector extends AbstractOrcColumnVector private final ColumnVector keyPaimonVector; private final ColumnVector valuePaimonVector; - public OrcMapColumnVector(MapColumnVector hiveVector, MapType type) { - super(hiveVector); + public OrcMapColumnVector(MapColumnVector hiveVector, int[] selected, MapType type) { + super(hiveVector, selected); this.hiveVector = hiveVector; - this.keyPaimonVector = createPaimonVector(hiveVector.keys, type.getKeyType()); - this.valuePaimonVector = createPaimonVector(hiveVector.values, type.getValueType()); + this.keyPaimonVector = createPaimonVector(hiveVector.keys, selected, type.getKeyType()); + this.valuePaimonVector = + createPaimonVector(hiveVector.values, selected, type.getValueType()); } @Override public InternalMap getMap(int i) { + i = rowMapper(i); long offset = hiveVector.offsets[i]; long length = hiveVector.lengths[i]; return new ColumnarMap(keyPaimonVector, valuePaimonVector, (int) offset, (int) length); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java index caa22467f9c3..2572dcb565f2 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java @@ -31,18 +31,20 @@ public class OrcRowColumnVector extends AbstractOrcColumnVector private final VectorizedColumnBatch batch; - public OrcRowColumnVector(StructColumnVector hiveVector, RowType type) { - super(hiveVector); + public OrcRowColumnVector(StructColumnVector hiveVector, int[] selected, RowType type) { + super(hiveVector, selected); int len = hiveVector.fields.length; ColumnVector[] paimonVectors = new ColumnVector[len]; for (int i = 0; i < len; i++) { - paimonVectors[i] = createPaimonVector(hiveVector.fields[i], type.getTypeAt(i)); + paimonVectors[i] = + createPaimonVector(hiveVector.fields[i], selected, type.getTypeAt(i)); } this.batch = new VectorizedColumnBatch(paimonVectors); } @Override public ColumnarRow getRow(int i) { + i = rowMapper(i); return new ColumnarRow(batch, i); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java index dd8ac08f2f57..758ef5d6e94c 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java @@ -33,13 +33,14 @@ public class OrcTimestampColumnVector extends AbstractOrcColumnVector private final TimestampColumnVector vector; - public OrcTimestampColumnVector(ColumnVector vector) { - super(vector); + public OrcTimestampColumnVector(ColumnVector vector, int[] selected) { + super(vector, selected); this.vector = (TimestampColumnVector) vector; } @Override public Timestamp getTimestamp(int i, int precision) { + i = rowMapper(i); int index = vector.isRepeating ? 0 : i; return DateTimeUtils.toInternal(vector.time[index], vector.nanos[index] % 1_000_000); } From 462edc6b5cc58f7821fd4153dfe9ac38ce4f6bcf Mon Sep 17 00:00:00 2001 From: "ranxianglei.rxl" Date: Sun, 22 Sep 2024 20:38:18 +0800 Subject: [PATCH 03/27] [format][orc] miss tolerateMissingSchema --- .../java/org/apache/paimon/format/orc/OrcReaderFactory.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 3d583333ad0d..21a7498704b2 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -268,7 +268,9 @@ private static RecordReader createRecordReader( .schema(schema) .range(offsetAndLength.getLeft(), offsetAndLength.getRight()) .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf)) - .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf)); + .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf)) + .tolerateMissingSchema( + OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf)); if (!conjunctPredicates.isEmpty()) { options.useSelected(true); options.allowSARGToFilter(true); From 2226fb9898cf3f29990c404d0c428954a2b53ab6 Mon Sep 17 00:00:00 2001 From: "ranxianglei.rxl" Date: Sun, 22 Sep 2024 21:08:24 +0800 Subject: [PATCH 04/27] [format][orc] fix orc selected close for no filter condition --- .../java/org/apache/paimon/format/orc/OrcReaderFactory.java | 2 +- .../paimon/format/orc/reader/AbstractOrcColumnVector.java | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 21a7498704b2..9071fe3b2525 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -123,7 +123,7 @@ public OrcReaderBatch createReaderBatch( for (int i = 0; i < vectors.length; i++) { String name = tableFieldNames.get(i); DataType type = tableFieldTypes.get(i); - int[] selected = orcBatch.getSelected(); + int[] selected = orcBatch.selectedInUse ? orcBatch.getSelected() : null; vectors[i] = createPaimonVector( orcBatch.cols[tableFieldNames.indexOf(name)], selected, type); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java index 5b612005f1c9..587ffc1c459d 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java @@ -48,7 +48,10 @@ public abstract class AbstractOrcColumnVector } protected int rowMapper(int r) { - return selected[r]; + if (this.selected != null) { + return selected[r]; + } + return r; } @Override From ee915c893229ca7c308090004ffc31ec987e7990 Mon Sep 17 00:00:00 2001 From: "ranxianglei.rxl" Date: Fri, 27 Sep 2024 11:39:48 +0800 Subject: [PATCH 05/27] [orc] keep useSelected and allowSARGToFilter close default, or deletion vectors would not work --- .../java/org/apache/paimon/format/orc/OrcReaderFactory.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 9071fe3b2525..2b9e88d89ccd 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -272,8 +272,10 @@ private static RecordReader createRecordReader( .tolerateMissingSchema( OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf)); if (!conjunctPredicates.isEmpty()) { - options.useSelected(true); - options.allowSARGToFilter(true); + // TODO fix it , if open this option,future deletion vectors would not work, + // cased by getRowNumber would be changed . + options.useSelected(OrcConf.READER_USE_SELECTED.getBoolean(conf)); + options.allowSARGToFilter(OrcConf.ALLOW_SARG_TO_FILTER.getBoolean(conf)); } // configure filters if (!conjunctPredicates.isEmpty()) { From 2920fc953805d99d07053b48551602f86a12fc2e Mon Sep 17 00:00:00 2001 From: "ranxianglei.rxl" Date: Tue, 15 Oct 2024 11:55:49 +0800 Subject: [PATCH 06/27] [format][orc] VectorizedRowBatch to OrcColumnVector for selected rows can be saw. --- .../paimon/format/orc/OrcReaderFactory.java | 3 +- .../orc/reader/AbstractOrcColumnVector.java | 33 ++++++++++--------- .../orc/reader/OrcArrayColumnVector.java | 8 +++-- .../orc/reader/OrcBytesColumnVector.java | 5 +-- .../orc/reader/OrcDecimalColumnVector.java | 5 +-- .../orc/reader/OrcDoubleColumnVector.java | 5 +-- .../OrcLegacyTimestampColumnVector.java | 5 +-- .../orc/reader/OrcLongColumnVector.java | 5 +-- .../format/orc/reader/OrcMapColumnVector.java | 10 +++--- .../format/orc/reader/OrcRowColumnVector.java | 8 +++-- .../orc/reader/OrcTimestampColumnVector.java | 5 +-- 11 files changed, 52 insertions(+), 40 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 2b9e88d89ccd..6b1590f03109 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -123,10 +123,9 @@ public OrcReaderBatch createReaderBatch( for (int i = 0; i < vectors.length; i++) { String name = tableFieldNames.get(i); DataType type = tableFieldTypes.get(i); - int[] selected = orcBatch.selectedInUse ? orcBatch.getSelected() : null; vectors[i] = createPaimonVector( - orcBatch.cols[tableFieldNames.indexOf(name)], selected, type); + orcBatch.cols[tableFieldNames.indexOf(name)], orcBatch, type); } return new OrcReaderBatch(filePath, orcBatch, new VectorizedColumnBatch(vectors), recycler); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java index 587ffc1c459d..377ff37a2e68 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** This column vector is used to adapt hive's ColumnVector to Paimon's ColumnVector. */ public abstract class AbstractOrcColumnVector @@ -40,18 +41,18 @@ public abstract class AbstractOrcColumnVector private final ColumnVector vector; - private final int[] selected; + private final VectorizedRowBatch orcBatch; - AbstractOrcColumnVector(ColumnVector vector, int[] selected) { + AbstractOrcColumnVector(ColumnVector vector, VectorizedRowBatch orcBatch) { this.vector = vector; - this.selected = selected; + this.orcBatch = orcBatch; } protected int rowMapper(int r) { - if (this.selected != null) { - return selected[r]; + if (vector.isRepeating) { + return 0; } - return r; + return this.orcBatch.selectedInUse ? this.orcBatch.getSelected()[r] : r; } @Override @@ -60,29 +61,29 @@ public boolean isNullAt(int i) { } public static org.apache.paimon.data.columnar.ColumnVector createPaimonVector( - ColumnVector vector, int[] selected, DataType dataType) { + ColumnVector vector, VectorizedRowBatch orcBatch, DataType dataType) { if (vector instanceof LongColumnVector) { if (dataType.getTypeRoot() == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) { - return new OrcLegacyTimestampColumnVector((LongColumnVector) vector, selected); + return new OrcLegacyTimestampColumnVector((LongColumnVector) vector, orcBatch); } else { - return new OrcLongColumnVector((LongColumnVector) vector, selected); + return new OrcLongColumnVector((LongColumnVector) vector, orcBatch); } } else if (vector instanceof DoubleColumnVector) { - return new OrcDoubleColumnVector((DoubleColumnVector) vector, selected); + return new OrcDoubleColumnVector((DoubleColumnVector) vector, orcBatch); } else if (vector instanceof BytesColumnVector) { - return new OrcBytesColumnVector((BytesColumnVector) vector, selected); + return new OrcBytesColumnVector((BytesColumnVector) vector, orcBatch); } else if (vector instanceof DecimalColumnVector) { - return new OrcDecimalColumnVector((DecimalColumnVector) vector, selected); + return new OrcDecimalColumnVector((DecimalColumnVector) vector, orcBatch); } else if (vector instanceof TimestampColumnVector) { - return new OrcTimestampColumnVector(vector, selected); + return new OrcTimestampColumnVector(vector, orcBatch); } else if (vector instanceof ListColumnVector) { return new OrcArrayColumnVector( - (ListColumnVector) vector, selected, (ArrayType) dataType); + (ListColumnVector) vector, orcBatch, (ArrayType) dataType); } else if (vector instanceof StructColumnVector) { return new OrcRowColumnVector( - (StructColumnVector) vector, selected, (RowType) dataType); + (StructColumnVector) vector, orcBatch, (RowType) dataType); } else if (vector instanceof MapColumnVector) { - return new OrcMapColumnVector((MapColumnVector) vector, selected, (MapType) dataType); + return new OrcMapColumnVector((MapColumnVector) vector, orcBatch, (MapType) dataType); } else { throw new UnsupportedOperationException( "Unsupported vector: " + vector.getClass().getName()); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java index ade154221835..25a1935f3e4b 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java @@ -24,6 +24,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** This column vector is used to adapt hive's ListColumnVector to Paimon's ArrayColumnVector. */ public class OrcArrayColumnVector extends AbstractOrcColumnVector @@ -32,10 +33,11 @@ public class OrcArrayColumnVector extends AbstractOrcColumnVector private final ListColumnVector hiveVector; private final ColumnVector paimonVector; - public OrcArrayColumnVector(ListColumnVector hiveVector, int[] selected, ArrayType type) { - super(hiveVector, selected); + public OrcArrayColumnVector( + ListColumnVector hiveVector, VectorizedRowBatch orcBatch, ArrayType type) { + super(hiveVector, orcBatch); this.hiveVector = hiveVector; - this.paimonVector = createPaimonVector(hiveVector.child, selected, type.getElementType()); + this.paimonVector = createPaimonVector(hiveVector.child, orcBatch, type.getElementType()); } @Override diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java index a2664f3b8e45..92b4853aaae4 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java @@ -19,6 +19,7 @@ package org.apache.paimon.format.orc.reader; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** This column vector is used to adapt hive's BytesColumnVector to Paimon's BytesColumnVector. */ public class OrcBytesColumnVector extends AbstractOrcColumnVector @@ -26,8 +27,8 @@ public class OrcBytesColumnVector extends AbstractOrcColumnVector private final BytesColumnVector vector; - public OrcBytesColumnVector(BytesColumnVector vector, int[] selected) { - super(vector, selected); + public OrcBytesColumnVector(BytesColumnVector vector, VectorizedRowBatch orcBatch) { + super(vector, orcBatch); this.vector = vector; } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java index 30963bb29fee..c8545723caf0 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.Decimal; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import java.math.BigDecimal; @@ -32,8 +33,8 @@ public class OrcDecimalColumnVector extends AbstractOrcColumnVector private final DecimalColumnVector vector; - public OrcDecimalColumnVector(DecimalColumnVector vector, int[] selected) { - super(vector, selected); + public OrcDecimalColumnVector(DecimalColumnVector vector, VectorizedRowBatch orcBatch) { + super(vector, orcBatch); this.vector = vector; } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java index 0353e4aea9f4..3e19b137a222 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java @@ -19,6 +19,7 @@ package org.apache.paimon.format.orc.reader; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** * This column vector is used to adapt hive's DoubleColumnVector to Paimon's float and double @@ -30,8 +31,8 @@ public class OrcDoubleColumnVector extends AbstractOrcColumnVector private final DoubleColumnVector vector; - public OrcDoubleColumnVector(DoubleColumnVector vector, int[] selected) { - super(vector, selected); + public OrcDoubleColumnVector(DoubleColumnVector vector, VectorizedRowBatch orcBatch) { + super(vector, orcBatch); this.vector = vector; } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java index 508895632374..eb3e014f2fcf 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import java.time.LocalDateTime; @@ -34,8 +35,8 @@ public class OrcLegacyTimestampColumnVector extends AbstractOrcColumnVector private final LongColumnVector hiveVector; - OrcLegacyTimestampColumnVector(LongColumnVector vector, int[] selected) { - super(vector, selected); + OrcLegacyTimestampColumnVector(LongColumnVector vector, VectorizedRowBatch orcBatch) { + super(vector, orcBatch); this.hiveVector = vector; } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java index 96f6922029a3..76c1c198d21d 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java @@ -19,6 +19,7 @@ package org.apache.paimon.format.orc.reader; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** * This column vector is used to adapt hive's LongColumnVector to Paimon's boolean, byte, short, int @@ -33,8 +34,8 @@ public class OrcLongColumnVector extends AbstractOrcColumnVector private final LongColumnVector vector; - public OrcLongColumnVector(LongColumnVector vector, int[] selected) { - super(vector, selected); + public OrcLongColumnVector(LongColumnVector vector, VectorizedRowBatch orcBatch) { + super(vector, orcBatch); this.vector = vector; } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java index e45aea60b59c..c7245275fdd2 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcMapColumnVector.java @@ -24,6 +24,7 @@ import org.apache.paimon.types.MapType; import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** This column vector is used to adapt hive's MapColumnVector to Paimon's MapColumnVector. */ public class OrcMapColumnVector extends AbstractOrcColumnVector @@ -33,12 +34,13 @@ public class OrcMapColumnVector extends AbstractOrcColumnVector private final ColumnVector keyPaimonVector; private final ColumnVector valuePaimonVector; - public OrcMapColumnVector(MapColumnVector hiveVector, int[] selected, MapType type) { - super(hiveVector, selected); + public OrcMapColumnVector( + MapColumnVector hiveVector, VectorizedRowBatch orcBatch, MapType type) { + super(hiveVector, orcBatch); this.hiveVector = hiveVector; - this.keyPaimonVector = createPaimonVector(hiveVector.keys, selected, type.getKeyType()); + this.keyPaimonVector = createPaimonVector(hiveVector.keys, orcBatch, type.getKeyType()); this.valuePaimonVector = - createPaimonVector(hiveVector.values, selected, type.getValueType()); + createPaimonVector(hiveVector.values, orcBatch, type.getValueType()); } @Override diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java index 2572dcb565f2..6c73c9fdbe0d 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java @@ -24,6 +24,7 @@ import org.apache.paimon.types.RowType; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** This column vector is used to adapt hive's StructColumnVector to Flink's RowColumnVector. */ public class OrcRowColumnVector extends AbstractOrcColumnVector @@ -31,13 +32,14 @@ public class OrcRowColumnVector extends AbstractOrcColumnVector private final VectorizedColumnBatch batch; - public OrcRowColumnVector(StructColumnVector hiveVector, int[] selected, RowType type) { - super(hiveVector, selected); + public OrcRowColumnVector( + StructColumnVector hiveVector, VectorizedRowBatch orcBatch, RowType type) { + super(hiveVector, orcBatch); int len = hiveVector.fields.length; ColumnVector[] paimonVectors = new ColumnVector[len]; for (int i = 0; i < len; i++) { paimonVectors[i] = - createPaimonVector(hiveVector.fields[i], selected, type.getTypeAt(i)); + createPaimonVector(hiveVector.fields[i], orcBatch, type.getTypeAt(i)); } this.batch = new VectorizedColumnBatch(paimonVectors); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java index 758ef5d6e94c..8840a02a8c83 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; /** * This column vector is used to adapt hive's TimestampColumnVector to Paimon's @@ -33,8 +34,8 @@ public class OrcTimestampColumnVector extends AbstractOrcColumnVector private final TimestampColumnVector vector; - public OrcTimestampColumnVector(ColumnVector vector, int[] selected) { - super(vector, selected); + public OrcTimestampColumnVector(ColumnVector vector, VectorizedRowBatch orcBatch) { + super(vector, orcBatch); this.vector = (TimestampColumnVector) vector; } From 8a89649cecb2e9c886e97cf7ef4a9baf4f1a0a7a Mon Sep 17 00:00:00 2001 From: "ranxianglei.rxl" Date: Tue, 15 Oct 2024 16:09:59 +0800 Subject: [PATCH 07/27] [format][orc] remove all isRepeating --- .../format/orc/reader/AbstractOrcColumnVector.java | 2 +- .../paimon/format/orc/reader/OrcBytesColumnVector.java | 5 ++--- .../format/orc/reader/OrcDecimalColumnVector.java | 3 +-- .../format/orc/reader/OrcDoubleColumnVector.java | 4 ++-- .../orc/reader/OrcLegacyTimestampColumnVector.java | 3 +-- .../paimon/format/orc/reader/OrcLongColumnVector.java | 10 +++++----- .../format/orc/reader/OrcTimestampColumnVector.java | 3 +-- 7 files changed, 13 insertions(+), 17 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java index 377ff37a2e68..0557a72230cc 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/AbstractOrcColumnVector.java @@ -57,7 +57,7 @@ protected int rowMapper(int r) { @Override public boolean isNullAt(int i) { - return !vector.noNulls && vector.isNull[vector.isRepeating ? 0 : i]; + return !vector.noNulls && vector.isNull[rowMapper(i)]; } public static org.apache.paimon.data.columnar.ColumnVector createPaimonVector( diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java index 92b4853aaae4..7f812bb5628b 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcBytesColumnVector.java @@ -34,11 +34,10 @@ public OrcBytesColumnVector(BytesColumnVector vector, VectorizedRowBatch orcBatc @Override public Bytes getBytes(int i) { - int rowId = vector.isRepeating ? 0 : i; - int selectedRowId = rowMapper(rowId); + int rowId = rowMapper(i); byte[][] data = vector.vector; int[] start = vector.start; int[] length = vector.length; - return new Bytes(data[selectedRowId], start[selectedRowId], length[selectedRowId]); + return new Bytes(data[rowId], start[rowId], length[rowId]); } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java index c8545723caf0..382c19f45be1 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDecimalColumnVector.java @@ -41,8 +41,7 @@ public OrcDecimalColumnVector(DecimalColumnVector vector, VectorizedRowBatch orc @Override public Decimal getDecimal(int i, int precision, int scale) { i = rowMapper(i); - BigDecimal data = - vector.vector[vector.isRepeating ? 0 : i].getHiveDecimal().bigDecimalValue(); + BigDecimal data = vector.vector[i].getHiveDecimal().bigDecimalValue(); return Decimal.fromBigDecimal(data, precision, scale); } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java index 3e19b137a222..f26dac6de9da 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcDoubleColumnVector.java @@ -39,12 +39,12 @@ public OrcDoubleColumnVector(DoubleColumnVector vector, VectorizedRowBatch orcBa @Override public double getDouble(int i) { i = rowMapper(i); - return vector.vector[vector.isRepeating ? 0 : i]; + return vector.vector[i]; } @Override public float getFloat(int i) { i = rowMapper(i); - return (float) vector.vector[vector.isRepeating ? 0 : i]; + return (float) vector.vector[i]; } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java index eb3e014f2fcf..5107e722edb4 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLegacyTimestampColumnVector.java @@ -43,8 +43,7 @@ public class OrcLegacyTimestampColumnVector extends AbstractOrcColumnVector @Override public Timestamp getTimestamp(int i, int precision) { i = rowMapper(i); - int index = hiveVector.isRepeating ? 0 : i; - java.sql.Timestamp timestamp = toTimestamp(hiveVector.vector[index]); + java.sql.Timestamp timestamp = toTimestamp(hiveVector.vector[i]); return Timestamp.fromSQLTimestamp(timestamp); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java index 76c1c198d21d..c289b74c58b2 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcLongColumnVector.java @@ -42,30 +42,30 @@ public OrcLongColumnVector(LongColumnVector vector, VectorizedRowBatch orcBatch) @Override public long getLong(int i) { i = rowMapper(i); - return vector.vector[vector.isRepeating ? 0 : i]; + return vector.vector[i]; } @Override public boolean getBoolean(int i) { i = rowMapper(i); - return vector.vector[vector.isRepeating ? 0 : i] == 1; + return vector.vector[i] == 1; } @Override public byte getByte(int i) { i = rowMapper(i); - return (byte) vector.vector[vector.isRepeating ? 0 : i]; + return (byte) vector.vector[i]; } @Override public int getInt(int i) { i = rowMapper(i); - return (int) vector.vector[vector.isRepeating ? 0 : i]; + return (int) vector.vector[i]; } @Override public short getShort(int i) { i = rowMapper(i); - return (short) vector.vector[vector.isRepeating ? 0 : i]; + return (short) vector.vector[i]; } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java index 8840a02a8c83..a6e71d6016f2 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcTimestampColumnVector.java @@ -42,7 +42,6 @@ public OrcTimestampColumnVector(ColumnVector vector, VectorizedRowBatch orcBatch @Override public Timestamp getTimestamp(int i, int precision) { i = rowMapper(i); - int index = vector.isRepeating ? 0 : i; - return DateTimeUtils.toInternal(vector.time[index], vector.nanos[index] % 1_000_000); + return DateTimeUtils.toInternal(vector.time[i], vector.nanos[i] % 1_000_000); } } From c745e557b909ec28c3d6d15fc67b4ffc1fd2c9e1 Mon Sep 17 00:00:00 2001 From: ranxianglei Date: Mon, 11 Nov 2024 10:01:10 +0000 Subject: [PATCH 08/27] [core][format] merge with aa16c2bf1 --- .../org/apache/paimon/format/FileFormat.java | 13 +++++ .../apache/paimon/fs/ObjectCacheManager.java | 57 +++++++++++++++++++ .../apache/paimon/fs/hadoop/HadoopFileIO.java | 27 +-------- .../manifest/ManifestEntrySerializer.java | 7 +++ .../apache/paimon/manifest/ManifestFile.java | 55 +++++++++++++++++- .../operation/AbstractFileStoreScan.java | 31 +++++++++- .../paimon/operation/FileStoreScan.java | 2 + .../table/source/AbstractDataTableScan.java | 6 ++ .../paimon/table/source/InnerTableScan.java | 9 +++ .../table/source/snapshot/SnapshotReader.java | 2 + .../source/snapshot/SnapshotReaderImpl.java | 13 ++++- .../paimon/table/system/AuditLogTable.java | 6 ++ .../org/apache/paimon/utils/ObjectsCache.java | 7 +++ .../paimon/format/orc/OrcFileFormat.java | 28 +++++++-- .../format/orc/reader/OrcRowColumnVector.java | 2 +- 15 files changed, 228 insertions(+), 37 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java index 78dfe1c0cad1..732f1f01e1a7 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.format.FileFormatFactory.FormatContext; +import org.apache.paimon.fs.ObjectCacheManager; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.statistics.SimpleColStatsCollector; @@ -28,6 +29,7 @@ import javax.annotation.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -40,6 +42,9 @@ */ public abstract class FileFormat { + private static final ObjectCacheManager formatFactoryCache = + ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000); + protected String formatIdentifier; protected FileFormat(String formatIdentifier) { @@ -92,9 +97,17 @@ public static FileFormat fromIdentifier(String identifier, FormatContext context private static Optional fromIdentifier( String formatIdentifier, FormatContext context, ClassLoader classLoader) { + + FileFormatFactory fileFormatFactory = + formatFactoryCache.getIfPresent(formatIdentifier.toLowerCase()); + if (fileFormatFactory != null) { + return Optional.of(fileFormatFactory.create(context)); + } + ServiceLoader serviceLoader = ServiceLoader.load(FileFormatFactory.class, classLoader); for (FileFormatFactory factory : serviceLoader) { + formatFactoryCache.put(factory.identifier(), factory); if (factory.identifier().equals(formatIdentifier.toLowerCase())) { return Optional.of(factory.create(context)); } diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java b/paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java new file mode 100644 index 000000000000..f157578107eb --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fs/ObjectCacheManager.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; + +import java.time.Duration; +import java.util.function.Function; + +/** + * Sample Object Cache Manager . + * + * @param + * @param + */ +public class ObjectCacheManager { + private final Cache cache; + + private ObjectCacheManager(Duration timeout, int maxSize) { + this.cache = Caffeine.newBuilder().maximumSize(maxSize).expireAfterWrite(timeout).build(); + } + + public static ObjectCacheManager newObjectCacheManager( + Duration timeout, int maxSize) { + return new ObjectCacheManager<>(timeout, maxSize); + } + + public ObjectCacheManager put(K k, V v) { + this.cache.put(k, v); + return this; + } + + public V get(K k, Function creator) { + return this.cache.get(k, creator); + } + + public V getIfPresent(K k) { + return this.cache.getIfPresent(k); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java index 70325ee69635..6b8104f1b6da 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java @@ -27,7 +27,6 @@ import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.hadoop.SerializableConfiguration; import org.apache.paimon.utils.FunctionWithException; -import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.ReflectionUtils; import org.apache.hadoop.fs.FSDataInputStream; @@ -39,10 +38,7 @@ import java.io.OutputStreamWriter; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.net.URI; import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; /** Hadoop {@link FileIO}. */ @@ -52,8 +48,6 @@ public class HadoopFileIO implements FileIO { protected SerializableConfiguration hadoopConf; - protected transient volatile Map, FileSystem> fsMap; - @VisibleForTesting public void setFileSystem(Path path, FileSystem fs) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); @@ -149,26 +143,7 @@ private FileSystem getFileSystem( org.apache.hadoop.fs.Path path, FunctionWithException creator) throws IOException { - if (fsMap == null) { - synchronized (this) { - if (fsMap == null) { - fsMap = new ConcurrentHashMap<>(); - } - } - } - - Map, FileSystem> map = fsMap; - - URI uri = path.toUri(); - String scheme = uri.getScheme(); - String authority = uri.getAuthority(); - Pair key = Pair.of(scheme, authority); - FileSystem fs = map.get(key); - if (fs == null) { - fs = creator.apply(path); - map.put(key, fs); - } - return fs; + return creator.apply(path); } protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java index 2c3ba2aeaab3..9bee11b64bbe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java @@ -37,11 +37,18 @@ public class ManifestEntrySerializer extends VersionedObjectSerializer filters; + private final String formatIdentifier; + + public FormatReaderFactoryKey( + String formatIdentifier, RowType entryType, List filters) { + this.entryType = entryType; + this.filters = filters; + this.formatIdentifier = formatIdentifier; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + FormatReaderFactoryKey that = (FormatReaderFactoryKey) o; + return Objects.equals(entryType, that.entryType) + && Objects.equals(filters, that.filters) + && Objects.equals(formatIdentifier, that.formatIdentifier); + } + + @Override + public int hashCode() { + return Objects.hash(entryType, filters, formatIdentifier); + } + } + + private static final ObjectCacheManager + readers = ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000); + private static final ObjectCacheManager + writers = ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000); + + public ManifestFile create(List filters) { + String formatIdentifier = this.fileFormat.getFormatIdentifier(); RowType entryType = VersionedObjectSerializer.versionType(ManifestEntry.SCHEMA); + FormatReaderFactoryKey formatReaderFactoryKey = + new FormatReaderFactoryKey(formatIdentifier, entryType, filters); return new ManifestFile( fileIO, schemaManager, partitionType, - new ManifestEntrySerializer(), + ManifestEntrySerializer.getInstance(), entryType, - fileFormat.createReaderFactory(entryType), - fileFormat.createWriterFactory(entryType), + readers.get( + formatReaderFactoryKey, + (ignore) -> fileFormat.createReaderFactory(entryType, filters)), + writers.get( + formatReaderFactoryKey, + (ignore) -> fileFormat.createWriterFactory(entryType)), compression, pathFactory.manifestFileFactory(), suggestedFileSize, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index d043932810c3..394eb0b3ffa4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -35,9 +35,12 @@ import org.apache.paimon.operation.metrics.ScanStats; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.ScanMode; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Filter; @@ -82,6 +85,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private Snapshot specifiedSnapshot = null; private Filter bucketFilter = null; + private List buckets; private List specifiedManifests = null; protected ScanMode scanMode = ScanMode.ALL; private Filter levelFilter = null; @@ -136,6 +140,14 @@ public FileStoreScan withPartitionFilter(PartitionPredicate predicate) { @Override public FileStoreScan withBucket(int bucket) { this.bucketFilter = i -> i == bucket; + this.buckets = Collections.singletonList(bucket); + return this; + } + + @Override + public FileStoreScan withBuckets(List buckets) { + this.bucketFilter = buckets::contains; + this.buckets = buckets; return this; } @@ -416,7 +428,7 @@ private boolean filterMergedManifestEntry(ManifestEntry entry) { public List readManifest(ManifestFileMeta manifest) { List entries = manifestFileFactory - .create() + .create(createPushDownFilter(buckets, numOfBuckets)) .read( manifest.fileName(), manifest.fileSize(), @@ -480,6 +492,23 @@ private static Filter createCacheRowFilter( }; } + /** + * Read the corresponding entries based on the current required partition and bucket. + * + *

Implemented to {@link InternalRow} is for performance (No deserialization). + */ + private static List createPushDownFilter(List buckets, int numOfBuckets) { + if (buckets == null || buckets.isEmpty()) { + return null; + } + List predicates = new ArrayList<>(); + PredicateBuilder predicateBuilder = + new PredicateBuilder( + RowType.of(new DataType[] {new IntType()}, new String[] {"_BUCKET"})); + predicates.add(predicateBuilder.in(0, new ArrayList<>(buckets))); + return predicates; + } + /** * Read the corresponding entries based on the current required partition and bucket. * diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index 197b4dff4962..4f3cd6e86463 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -55,6 +55,8 @@ public interface FileStoreScan { FileStoreScan withBucket(int bucket); + FileStoreScan withBuckets(List buckets); + FileStoreScan withBucketFilter(Filter bucketFilter); FileStoreScan withPartitionBucket(BinaryRow partition, int bucket); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index ba1bc6588f68..1e292de12a40 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -80,6 +80,12 @@ public AbstractDataTableScan withBucketFilter(Filter bucketFilter) { return this; } + @Override + public AbstractDataTableScan withBuckets(List buckets) { + snapshotReader.withBuckets(buckets); + return this; + } + @Override public AbstractDataTableScan withPartitionFilter(Map partitionSpec) { snapshotReader.withPartitionFilter(partitionSpec); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java index 00a4fc0cde18..5dfd83640f5d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java @@ -23,6 +23,7 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.utils.Filter; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -47,6 +48,14 @@ default InnerTableScan withBucketFilter(Filter bucketFilter) { return this; } + default InnerTableScan withBucket(Integer bucket) { + return withBuckets(Collections.singletonList(bucket)); + } + + default InnerTableScan withBuckets(List buckets) { + throw new RuntimeException("not impl withBuckets for " + this.getClass().getName()); + } + default InnerTableScan withLevelFilter(Filter levelFilter) { return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index bd07f49fda01..ec53fdfb1b88 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -80,6 +80,8 @@ public interface SnapshotReader { SnapshotReader withBucket(int bucket); + SnapshotReader withBuckets(List buckets); + SnapshotReader withBucketFilter(Filter bucketFilter); SnapshotReader withDataFileNameFilter(Filter fileNameFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index b2fd1c7850f5..2cf4164f7593 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -245,6 +245,11 @@ public SnapshotReader withBucket(int bucket) { return this; } + public SnapshotReader withBuckets(List buckets) { + scan.withBuckets(buckets); + return this; + } + @Override public SnapshotReader withBucketFilter(Filter bucketFilter) { scan.withBucketFilter(bucketFilter); @@ -271,7 +276,13 @@ public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubt Math.abs(file.hashCode() % numberOfParallelSubtasks) == indexOfThisSubtask); } else { - withBucketFilter(bucket -> bucket % numberOfParallelSubtasks == indexOfThisSubtask); + List buckets = new ArrayList<>(); + for (int bucket = 0; bucket < numberOfParallelSubtasks; bucket++) { + if (bucket % numberOfParallelSubtasks == indexOfThisSubtask) { + buckets.add(bucket); + } + } + withBuckets(buckets); } return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index d9cf80289953..92556dd51704 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -334,6 +334,12 @@ public SnapshotReader withBucket(int bucket) { return this; } + @Override + public SnapshotReader withBuckets(List buckets) { + snapshotReader.withBuckets(buckets); + return this; + } + @Override public SnapshotReader withBucketFilter(Filter bucketFilter) { snapshotReader.withBucketFilter(bucketFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java index 8c490e008baa..ff1c76cb0a03 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java @@ -28,6 +28,9 @@ import org.apache.paimon.memory.MemorySegmentSource; import org.apache.paimon.types.RowType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -41,6 +44,7 @@ /** Cache records to {@link SegmentsCache} by compacted serializer. */ @ThreadSafe public class ObjectsCache { + protected static final Logger LOG = LoggerFactory.getLogger(ObjectsCache.class); private final SegmentsCache cache; private final ObjectSerializer projectedSerializer; @@ -72,6 +76,9 @@ public List read( if (segments != null) { return readFromSegments(segments, readFilter); } else { + if (LOG.isDebugEnabled()) { + LOG.debug("not match cache key {}", key); + } if (fileSize == null) { fileSize = fileSizeFunction.apply(key); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java index c2db03aa16ed..e2e2c057e1a6 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java @@ -31,6 +31,7 @@ import org.apache.paimon.format.orc.reader.OrcSplitReaderUtil; import org.apache.paimon.format.orc.writer.RowDataVectorizer; import org.apache.paimon.format.orc.writer.Vectorizer; +import org.apache.paimon.fs.ObjectCacheManager; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; @@ -44,12 +45,14 @@ import org.apache.paimon.types.MultisetType; import org.apache.paimon.types.RowType; +import org.apache.hadoop.conf.Configuration; import org.apache.orc.OrcConf; import org.apache.orc.TypeDescription; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -70,13 +73,29 @@ public class OrcFileFormat extends FileFormat { private final int readBatchSize; private final int writeBatchSize; + private static final org.apache.hadoop.conf.Configuration emptyConf = + new org.apache.hadoop.conf.Configuration(); + private static final ObjectCacheManager configCache = + ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000); + + static { + emptyConf.set("paimon.empty.configuration", "paimon.empty.configuration"); + } + public OrcFileFormat(FormatContext formatContext) { super(IDENTIFIER); this.orcProperties = getOrcProperties(formatContext.formatOptions(), formatContext); - this.readerConf = new org.apache.hadoop.conf.Configuration(); - this.orcProperties.forEach((k, v) -> readerConf.set(k.toString(), v.toString())); - this.writerConf = new org.apache.hadoop.conf.Configuration(); - this.orcProperties.forEach((k, v) -> writerConf.set(k.toString(), v.toString())); + Configuration conf; + Configuration cachedConf = configCache.getIfPresent(orcProperties); + if (cachedConf != null) { + conf = cachedConf; + } else { + conf = new org.apache.hadoop.conf.Configuration(emptyConf); + this.orcProperties.forEach((k, v) -> conf.set(k.toString(), v.toString())); + configCache.put(orcProperties, conf); + } + this.readerConf = conf; + this.writerConf = conf; this.readBatchSize = formatContext.readBatchSize(); this.writeBatchSize = formatContext.writeBatchSize(); } @@ -146,7 +165,6 @@ public FormatWriterFactory createWriterFactory(RowType type) { private static Properties getOrcProperties(Options options, FormatContext formatContext) { Properties orcProperties = new Properties(); - Properties properties = new Properties(); options.addAllToProperties(properties); properties.forEach((k, v) -> orcProperties.put(IDENTIFIER + "." + k, v)); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java index 6c73c9fdbe0d..b925f5b536b1 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java @@ -46,7 +46,7 @@ public OrcRowColumnVector( @Override public ColumnarRow getRow(int i) { - i = rowMapper(i); + // no need to call rowMapper here . return new ColumnarRow(batch, i); } From bbdd316ed2450fe77672b4f1d470be65d5663df9 Mon Sep 17 00:00:00 2001 From: "ranxianglei.rxl" Date: Mon, 11 Nov 2024 19:11:01 +0800 Subject: [PATCH 09/27] [core] fix AuditLogTable merge error --- .../main/java/org/apache/paimon/table/system/AuditLogTable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index c7fb56c5b4a3..a11e88074932 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -332,7 +332,7 @@ public SnapshotReader withBucket(int bucket) { @Override public SnapshotReader withBuckets(List buckets) { - snapshotReader.withBuckets(buckets); + wrapped.withBuckets(buckets); return this; } From 10ef09c343e149b9a13dc43ff47239de9222026c Mon Sep 17 00:00:00 2001 From: ranxianglei Date: Tue, 12 Nov 2024 04:01:45 +0000 Subject: [PATCH 10/27] [format] recover HadoopFileIO --- .../apache/paimon/fs/hadoop/HadoopFileIO.java | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java index 6b8104f1b6da..c948536660f5 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java @@ -27,6 +27,7 @@ import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.hadoop.SerializableConfiguration; import org.apache.paimon.utils.FunctionWithException; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.ReflectionUtils; import org.apache.hadoop.fs.FSDataInputStream; @@ -39,6 +40,9 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; +import java.net.URI; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; /** Hadoop {@link FileIO}. */ @@ -48,6 +52,8 @@ public class HadoopFileIO implements FileIO { protected SerializableConfiguration hadoopConf; + protected transient volatile Map, FileSystem> fsMap; + @VisibleForTesting public void setFileSystem(Path path, FileSystem fs) throws IOException { org.apache.hadoop.fs.Path hadoopPath = path(path); @@ -143,7 +149,26 @@ private FileSystem getFileSystem( org.apache.hadoop.fs.Path path, FunctionWithException creator) throws IOException { - return creator.apply(path); + if (fsMap == null) { + synchronized (this) { + if (fsMap == null) { + fsMap = new ConcurrentHashMap<>(); + } + } + } + + Map, FileSystem> map = fsMap; + + URI uri = path.toUri(); + String scheme = uri.getScheme(); + String authority = uri.getAuthority(); + Pair key = Pair.of(scheme, authority); + FileSystem fs = map.get(key); + if (fs == null) { + fs = creator.apply(path); + map.put(key, fs); + } + return fs; } protected FileSystem createFileSystem(org.apache.hadoop.fs.Path path) throws IOException { From a2acbab853f6e1d187a2ab35684476a395ef718f Mon Sep 17 00:00:00 2001 From: "ranxianglei.rxl" Date: Tue, 12 Nov 2024 12:04:54 +0800 Subject: [PATCH 11/27] [format] checkstyle --- .../src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java index c948536660f5..70325ee69635 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java @@ -39,8 +39,8 @@ import java.io.OutputStreamWriter; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.nio.charset.StandardCharsets; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; From e1c90c70db49a780ac32f10ce033f59cc9e3cf62 Mon Sep 17 00:00:00 2001 From: "ranxianglei.rxl" Date: Tue, 12 Nov 2024 12:17:08 +0800 Subject: [PATCH 12/27] [format][orc] add pushdown option only for reader . --- .../src/main/java/org/apache/orc/OrcConf.java | 15 +++++++++++++++ .../paimon/format/orc/OrcReaderFactory.java | 5 +++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/orc/OrcConf.java b/paimon-format/src/main/java/org/apache/orc/OrcConf.java index a7fa1a21bc8c..ee07e45117a4 100644 --- a/paimon-format/src/main/java/org/apache/orc/OrcConf.java +++ b/paimon-format/src/main/java/org/apache/orc/OrcConf.java @@ -305,6 +305,21 @@ public enum OrcConf { + "must have the filter\n" + "reapplied to avoid using unset values in the unselected rows.\n" + "If unsure please leave this as false."), + + READER_ONLY_ALLOW_SARG_TO_FILTER( + "orc.reader.sarg.to.filter", + "orc.reader.sarg.to.filter", + false, + "A boolean flag to determine if a SArg is allowed to become a filter, only for reader."), + READER_ONLY_USE_SELECTED( + "orc.reader.filter.use.selected", + "orc.reader.filter.use.selected", + false, + "A boolean flag to determine if the selected vector is supported by\n" + + "the reading application, only for reader. If false, the output of the ORC reader " + + "must have the filter\n" + + "reapplied to avoid using unset values in the unselected rows.\n" + + "If unsure please leave this as false."), ALLOW_PLUGIN_FILTER( "orc.filter.plugin", "orc.filter.plugin", diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 4b4c967f4de2..5543fd791075 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -278,8 +278,9 @@ private static RecordReader createRecordReader( if (!conjunctPredicates.isEmpty()) { // TODO fix it , if open this option,future deletion vectors would not work, // cased by getRowNumber would be changed . - options.useSelected(OrcConf.READER_USE_SELECTED.getBoolean(conf)); - options.allowSARGToFilter(OrcConf.ALLOW_SARG_TO_FILTER.getBoolean(conf)); + options.useSelected(OrcConf.READER_ONLY_USE_SELECTED.getBoolean(conf)); + options.allowSARGToFilter( + OrcConf.READER_ONLY_ALLOW_SARG_TO_FILTER.getBoolean(conf)); } // configure filters if (!conjunctPredicates.isEmpty()) { From 8c9a75c3dbb0ecd1e759a082f78ed34100159312 Mon Sep 17 00:00:00 2001 From: ranxianglei Date: Tue, 12 Nov 2024 10:09:00 +0000 Subject: [PATCH 13/27] [core] recover bucket --- .../paimon/table/source/snapshot/SnapshotReaderImpl.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 14cf6dfae41a..74f257ddcf9d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -277,13 +277,7 @@ public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubt Math.abs(file.hashCode() % numberOfParallelSubtasks) == indexOfThisSubtask); } else { - List buckets = new ArrayList<>(); - for (int bucket = 0; bucket < numberOfParallelSubtasks; bucket++) { - if (bucket % numberOfParallelSubtasks == indexOfThisSubtask) { - buckets.add(bucket); - } - } - withBuckets(buckets); + withBucketFilter(bucket -> bucket % numberOfParallelSubtasks == indexOfThisSubtask); } return this; } From f71d658ab09dfde580396220a85ef0d22bc039b7 Mon Sep 17 00:00:00 2001 From: ranxianglei Date: Tue, 12 Nov 2024 10:44:58 +0000 Subject: [PATCH 14/27] [core][format] add test for withBuckets and orcFormat --- .../operation/KeyValueFileStoreScanTest.java | 31 +++++++-- .../paimon/format/orc/OrcFileFormatTest.java | 64 +++++++++++++++++++ 2 files changed, 88 insertions(+), 7 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index ce17450538b1..8d6ab5aad37c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -40,13 +40,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; @@ -225,6 +219,29 @@ public void testWithBucket() throws Exception { runTestExactMatch(scan, snapshot.id(), expected); } + @Test + public void testWithBuckets() throws Exception { + ThreadLocalRandom random = ThreadLocalRandom.current(); + List data = generateData(random.nextInt(1000) + 1); + Snapshot snapshot = writeData(data); + + int wantedBucket1 = random.nextInt(NUM_BUCKETS); + int wantedBucket2 = random.nextInt(NUM_BUCKETS); + int wantedBucket3 = random.nextInt(NUM_BUCKETS); + List buckets = Arrays.asList(wantedBucket1, wantedBucket2, wantedBucket3); + + FileStoreScan scan = store.newScan(); + scan.withSnapshot(snapshot.id()); + scan.withBuckets(buckets); + + Map expected = + store.toKvMap( + data.stream() + .filter(kv -> buckets.contains(getBucket(kv))) + .collect(Collectors.toList())); + runTestExactMatch(scan, snapshot.id(), expected); + } + @Test public void testWithSnapshot() throws Exception { ThreadLocalRandom random = ThreadLocalRandom.current(); diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java index 46bf6afe6613..216c083eec66 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java @@ -83,4 +83,68 @@ public void testSupportedDataTypes() { dataFields.add(new DataField(index++, "decimal_type", DataTypes.DECIMAL(10, 3))); orc.validateDataFields(new RowType(dataFields)); } + + @Test + public void testCreateCost() { + double createConfCost = createConfigCost(); + for (int i = 0; i < 1000; i++) { + create(); + } + int fix_times = 10_000; + long start = System.nanoTime(); + for (int i = 0; i < fix_times; i++) { + create(); + } + double cost = ((double) (System.nanoTime() - start)) / 1000_000 / fix_times; + assertThat(cost * 500 < createConfCost); + } + + @Test + public void testCreateCostWithRandomConfig() { + double createConfCost = createConfigCost(); + for (int i = 0; i < 1000; i++) { + createRandomConfig(); + } + int fix_times = 10_000; + long start = System.nanoTime(); + for (int i = 0; i < fix_times; i++) { + createRandomConfig(); + } + double cost = ((double) (System.nanoTime() - start)) / 1000_000 / fix_times; + assertThat(cost * 10 < createConfCost); + } + + private double createConfigCost() { + for (int i = 0; i < 1000; i++) { + createConfig(); + } + int fix_times = 10_000; + long start = System.nanoTime(); + for (int i = 0; i < fix_times; i++) { + createConfig(); + } + return ((double) (System.nanoTime() - start)) / 1000_000 / fix_times; + } + + private void createConfig() { + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + conf.set("a", "a"); + } + + private void create() { + Options options = new Options(); + options.setString("haha", "1"); + options.setString("compress", "zlib"); + OrcFileFormat orcFileFormat = + new OrcFileFormatFactory().create(new FormatContext(options, 1024)); + } + + private void createRandomConfig() { + Options options = new Options(); + options.setString("haha", "1"); + options.setString("compress", "zlib"); + options.setString("a", Math.random() + ""); + OrcFileFormat orcFileFormat = + new OrcFileFormatFactory().create(new FormatContext(options, 1024)); + } } From efac5b6fdaffe1fba46db4e3d752f9f1021a0881 Mon Sep 17 00:00:00 2001 From: ranxianglei Date: Tue, 12 Nov 2024 10:52:45 +0000 Subject: [PATCH 15/27] [format] fix checkstyle --- .../paimon/format/orc/OrcFileFormatTest.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java index 216c083eec66..3ca3ae6bc23a 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java @@ -90,12 +90,12 @@ public void testCreateCost() { for (int i = 0; i < 1000; i++) { create(); } - int fix_times = 10_000; + int times = 10_000; long start = System.nanoTime(); - for (int i = 0; i < fix_times; i++) { + for (int i = 0; i < times; i++) { create(); } - double cost = ((double) (System.nanoTime() - start)) / 1000_000 / fix_times; + double cost = ((double) (System.nanoTime() - start)) / 1000_000 / times; assertThat(cost * 500 < createConfCost); } @@ -105,12 +105,12 @@ public void testCreateCostWithRandomConfig() { for (int i = 0; i < 1000; i++) { createRandomConfig(); } - int fix_times = 10_000; + int times = 10_000; long start = System.nanoTime(); - for (int i = 0; i < fix_times; i++) { + for (int i = 0; i < times; i++) { createRandomConfig(); } - double cost = ((double) (System.nanoTime() - start)) / 1000_000 / fix_times; + double cost = ((double) (System.nanoTime() - start)) / 1000_000 / times; assertThat(cost * 10 < createConfCost); } @@ -118,12 +118,12 @@ private double createConfigCost() { for (int i = 0; i < 1000; i++) { createConfig(); } - int fix_times = 10_000; + int times = 10_000; long start = System.nanoTime(); - for (int i = 0; i < fix_times; i++) { + for (int i = 0; i < times; i++) { createConfig(); } - return ((double) (System.nanoTime() - start)) / 1000_000 / fix_times; + return ((double) (System.nanoTime() - start)) / 1000_000 / times; } private void createConfig() { From 15b191042870b77d6f74187712bcc5ef0a996b19 Mon Sep 17 00:00:00 2001 From: "ranxianglei.rxl" Date: Tue, 12 Nov 2024 18:58:28 +0800 Subject: [PATCH 16/27] [format] fix version caused error --- .../java/org/apache/paimon/format/orc/OrcFileFormatTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java index 3ca3ae6bc23a..38957240816a 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java @@ -136,7 +136,7 @@ private void create() { options.setString("haha", "1"); options.setString("compress", "zlib"); OrcFileFormat orcFileFormat = - new OrcFileFormatFactory().create(new FormatContext(options, 1024)); + new OrcFileFormatFactory().create(new FormatContext(options, 1024, 1024)); } private void createRandomConfig() { @@ -145,6 +145,6 @@ private void createRandomConfig() { options.setString("compress", "zlib"); options.setString("a", Math.random() + ""); OrcFileFormat orcFileFormat = - new OrcFileFormatFactory().create(new FormatContext(options, 1024)); + new OrcFileFormatFactory().create(new FormatContext(options, 1024, 1024)); } } From e1b3406aa50873c2ec00cdb1d1537369a0a786ef Mon Sep 17 00:00:00 2001 From: ranxianglei Date: Tue, 12 Nov 2024 11:05:07 +0000 Subject: [PATCH 17/27] [core] fix checkstyle --- .../paimon/operation/KeyValueFileStoreScanTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index 8d6ab5aad37c..1d04d2f58f02 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -40,7 +40,14 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; From d48fff6100b9581e728af3be51e06108f6074625 Mon Sep 17 00:00:00 2001 From: ranxianglei Date: Wed, 13 Nov 2024 06:38:31 +0000 Subject: [PATCH 18/27] [format] add FormatPerformanceTest --- .../paimon/format/FileFormatFactory.java | 3 +- .../org.apache.paimon.factories.Factory | 18 ++++ .../paimon/format/FormatPerformanceTest.java | 101 ++++++++++++++++++ .../paimon/format/orc/OrcFileFormatTest.java | 4 +- 4 files changed, 123 insertions(+), 3 deletions(-) create mode 100644 paimon-format/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory create mode 100644 paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java index b726a84f24a2..d377cb2815c2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java @@ -19,13 +19,14 @@ package org.apache.paimon.format; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.factories.Factory; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import javax.annotation.Nullable; /** Factory to create {@link FileFormat}. */ -public interface FileFormatFactory { +public interface FileFormatFactory extends Factory { String identifier(); diff --git a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory new file mode 100644 index 000000000000..4aa1b23b3b7a --- /dev/null +++ b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.paimon.format.avro.AvroFileFormatFactory +org.apache.paimon.format.orc.OrcFileFormatFactory +org.apache.paimon.format.parquet.ParquetFileFormatFactory \ No newline at end of file diff --git a/paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java b/paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java new file mode 100644 index 000000000000..995c6143e0d0 --- /dev/null +++ b/paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format; + +import org.apache.paimon.factories.FactoryUtil; +import org.apache.paimon.options.Options; + +import org.junit.Test; + +import java.util.ServiceLoader; + +import static org.assertj.core.api.Assertions.assertThat; + +/** test format performance. */ +public class FormatPerformanceTest { + + @Test + public void testFormatPerformance() { + double objectCacheCost = newFileFormatWithObjectCache(); + double serviceCost = newFileFormatWithServiceLoader(); + double factoryUtilCost = newFileFormatWithFactoryUtil(); + assertThat(objectCacheCost * 30 < serviceCost).isTrue(); + assertThat(objectCacheCost * 30 < factoryUtilCost).isTrue(); + } + + private double newFileFormatWithServiceLoader() { + for (int i = 0; i < 1000; i++) { + loadFromIdentifier(); + } + int times = 10_000; + long start = System.nanoTime(); + int nothing = 0; + for (int i = 0; i < times; i++) { + nothing += loadFromIdentifier(); + } + nothing = 0; + return ((double) (System.nanoTime() - start)) / 1000_000 / times + nothing; + } + + private double newFileFormatWithObjectCache() { + for (int i = 0; i < 1000; i++) { + newFileFormat(); + } + int times = 10_000; + long start = System.nanoTime(); + for (int i = 0; i < times; i++) { + newFileFormat(); + } + return ((double) (System.nanoTime() - start)) / 1000_000 / times; + } + + private double newFileFormatWithFactoryUtil() { + for (int i = 0; i < 1000; i++) { + newFileFormatFromFactoryUtil(); + } + int times = 10_000; + long start = System.nanoTime(); + for (int i = 0; i < times; i++) { + newFileFormatFromFactoryUtil(); + } + return ((double) (System.nanoTime() - start)) / 1000_000 / times; + } + + private int loadFromIdentifier() { + ServiceLoader serviceLoader = + ServiceLoader.load(FileFormatFactory.class, FileFormat.class.getClassLoader()); + int i = 0; + for (FileFormatFactory factory : serviceLoader) { + i++; + } + return i; + } + + private FileFormat newFileFormat() { + FileFormat orc = FileFormat.fromIdentifier("orc", new Options()); + return orc; + } + + @Test + public void newFileFormatFromFactoryUtil() { + FileFormatFactory fileFormatFactory = + FactoryUtil.discoverFactory( + FileFormatFactory.class.getClassLoader(), FileFormatFactory.class, "orc"); + } +} diff --git a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java index 38957240816a..9e5769595c32 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFileFormatTest.java @@ -96,7 +96,7 @@ public void testCreateCost() { create(); } double cost = ((double) (System.nanoTime() - start)) / 1000_000 / times; - assertThat(cost * 500 < createConfCost); + assertThat(cost * 500 < createConfCost).isTrue(); } @Test @@ -111,7 +111,7 @@ public void testCreateCostWithRandomConfig() { createRandomConfig(); } double cost = ((double) (System.nanoTime() - start)) / 1000_000 / times; - assertThat(cost * 10 < createConfCost); + assertThat(cost * 10 < createConfCost).isTrue(); } private double createConfigCost() { From a0efae22cea5eaa69d46dcda66157f508d740c6e Mon Sep 17 00:00:00 2001 From: ranxianglei Date: Wed, 13 Nov 2024 06:51:27 +0000 Subject: [PATCH 19/27] [format][tests] FormatPerformanceTest change to 10 times --- .../java/org/apache/paimon/format/FormatPerformanceTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java b/paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java index 995c6143e0d0..ae5383aa0d27 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java @@ -35,8 +35,8 @@ public void testFormatPerformance() { double objectCacheCost = newFileFormatWithObjectCache(); double serviceCost = newFileFormatWithServiceLoader(); double factoryUtilCost = newFileFormatWithFactoryUtil(); - assertThat(objectCacheCost * 30 < serviceCost).isTrue(); - assertThat(objectCacheCost * 30 < factoryUtilCost).isTrue(); + assertThat(objectCacheCost * 10 < serviceCost).isTrue(); + assertThat(objectCacheCost * 10 < factoryUtilCost).isTrue(); } private double newFileFormatWithServiceLoader() { From 016620ca6e88a1511f03205c355e3354831463f4 Mon Sep 17 00:00:00 2001 From: ranxianglei Date: Wed, 13 Nov 2024 07:05:10 +0000 Subject: [PATCH 20/27] [format][tests] FormatPerformanceTest change to lessthan to pass github tests, may machine not diffrence --- .../java/org/apache/paimon/format/FormatPerformanceTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java b/paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java index ae5383aa0d27..cd72bcc53dd4 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java @@ -35,8 +35,8 @@ public void testFormatPerformance() { double objectCacheCost = newFileFormatWithObjectCache(); double serviceCost = newFileFormatWithServiceLoader(); double factoryUtilCost = newFileFormatWithFactoryUtil(); - assertThat(objectCacheCost * 10 < serviceCost).isTrue(); - assertThat(objectCacheCost * 10 < factoryUtilCost).isTrue(); + assertThat(objectCacheCost).isLessThan(serviceCost); + assertThat(objectCacheCost).isLessThan(factoryUtilCost); } private double newFileFormatWithServiceLoader() { From 133a4918e80bd2d98d98536024b9e004a64fc95d Mon Sep 17 00:00:00 2001 From: ranxianglei Date: Thu, 14 Nov 2024 06:19:28 +0000 Subject: [PATCH 21/27] [format] merge conflicts --- .../org/apache/paimon/format/FileFormat.java | 39 ++----- .../operation/AbstractFileStoreScan.java | 6 +- .../paimon/operation/FileStoreScan.java | 3 +- .../table/source/AbstractDataTableScan.java | 3 +- .../paimon/table/source/InnerTableScan.java | 3 +- .../table/source/snapshot/SnapshotReader.java | 3 +- .../source/snapshot/SnapshotReaderImpl.java | 11 +- .../paimon/table/system/AuditLogTable.java | 3 +- .../operation/KeyValueFileStoreScanTest.java | 3 +- ...org.apache.paimon.format.FileFormatFactory | 18 ---- .../paimon/format/FormatPerformanceTest.java | 101 ------------------ 11 files changed, 30 insertions(+), 163 deletions(-) delete mode 100644 paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory delete mode 100644 paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java index ab75f0348584..83c929e94cec 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java @@ -19,8 +19,8 @@ package org.apache.paimon.format; import org.apache.paimon.CoreOptions; +import org.apache.paimon.factories.FactoryUtil; import org.apache.paimon.format.FileFormatFactory.FormatContext; -import org.apache.paimon.fs.ObjectCacheManager; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.statistics.SimpleColStatsCollector; @@ -28,13 +28,11 @@ import javax.annotation.Nullable; -import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.ServiceLoader; /** * Factory class which creates reader and writer factories for specific file format. @@ -43,9 +41,6 @@ */ public abstract class FileFormat { - private static final ObjectCacheManager formatFactoryCache = - ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000); - protected String formatIdentifier; protected FileFormat(String formatIdentifier) { @@ -93,34 +88,12 @@ public static FileFormat fromIdentifier(String identifier, Options options) { /** Create a {@link FileFormat} from format identifier and format options. */ public static FileFormat fromIdentifier(String identifier, FormatContext context) { - return fromIdentifier(identifier, context, FileFormat.class.getClassLoader()) - .orElseThrow( - () -> - new RuntimeException( - String.format( - "Could not find a FileFormatFactory implementation class for %s format", - identifier))); - } - - private static Optional fromIdentifier( - String formatIdentifier, FormatContext context, ClassLoader classLoader) { - FileFormatFactory fileFormatFactory = - formatFactoryCache.getIfPresent(formatIdentifier.toLowerCase()); - if (fileFormatFactory != null) { - return Optional.of(fileFormatFactory.create(context)); - } - - ServiceLoader serviceLoader = - ServiceLoader.load(FileFormatFactory.class, classLoader); - for (FileFormatFactory factory : serviceLoader) { - formatFactoryCache.put(factory.identifier(), factory); - if (factory.identifier().equals(formatIdentifier.toLowerCase())) { - return Optional.of(factory.create(context)); - } - } - - return Optional.empty(); + FactoryUtil.discoverFactory( + FileFormatFactory.class.getClassLoader(), + FileFormatFactory.class, + identifier); + return fileFormatFactory.create(context); } protected Options getIdentifierPrefixOptions(Options options) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 99e8b411fbaf..deb6fadd7c19 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -85,7 +85,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private Snapshot specifiedSnapshot = null; private Filter bucketFilter = null; - private List buckets; + private Collection buckets; private BiFilter totalAwareBucketFilter = null; private List specifiedManifests = null; protected ScanMode scanMode = ScanMode.ALL; @@ -138,7 +138,7 @@ public FileStoreScan withBucket(int bucket) { } @Override - public FileStoreScan withBuckets(List buckets) { + public FileStoreScan withBuckets(Collection buckets) { this.bucketFilter = buckets::contains; this.buckets = buckets; return this; @@ -444,7 +444,7 @@ private Filter createCacheRowFilter() { * *

Implemented to {@link InternalRow} is for performance (No deserialization). */ - private static List createPushDownFilter(List buckets) { + private static List createPushDownFilter(Collection buckets) { if (buckets == null || buckets.isEmpty()) { return null; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index 65af1e6fe259..89e7e7aace90 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -38,6 +38,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -57,7 +58,7 @@ public interface FileStoreScan { FileStoreScan withBucket(int bucket); - FileStoreScan withBuckets(List buckets); + FileStoreScan withBuckets(Collection buckets); FileStoreScan withBucketFilter(Filter bucketFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index 4b071ba617ab..9299b86aa420 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -48,6 +48,7 @@ import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -80,7 +81,7 @@ public AbstractDataTableScan withBucketFilter(Filter bucketFilter) { } @Override - public AbstractDataTableScan withBuckets(List buckets) { + public AbstractDataTableScan withBuckets(Collection buckets) { snapshotReader.withBuckets(buckets); return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java index 5dfd83640f5d..6c3b28f0f4ce 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java @@ -23,6 +23,7 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.utils.Filter; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -52,7 +53,7 @@ default InnerTableScan withBucket(Integer bucket) { return withBuckets(Collections.singletonList(bucket)); } - default InnerTableScan withBuckets(List buckets) { + default InnerTableScan withBuckets(Collection buckets) { throw new RuntimeException("not impl withBuckets for " + this.getClass().getName()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index 76a550ba4ce6..026e529dc0c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -39,6 +39,7 @@ import javax.annotation.Nullable; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -81,7 +82,7 @@ public interface SnapshotReader { SnapshotReader withBucket(int bucket); - SnapshotReader withBuckets(List buckets); + SnapshotReader withBuckets(Collection buckets); SnapshotReader withBucketFilter(Filter bucketFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 74f257ddcf9d..c3f027da962f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -53,6 +53,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -246,7 +247,7 @@ public SnapshotReader withBucket(int bucket) { return this; } - public SnapshotReader withBuckets(List buckets) { + public SnapshotReader withBuckets(Collection buckets) { scan.withBuckets(buckets); return this; } @@ -277,7 +278,13 @@ public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubt Math.abs(file.hashCode() % numberOfParallelSubtasks) == indexOfThisSubtask); } else { - withBucketFilter(bucket -> bucket % numberOfParallelSubtasks == indexOfThisSubtask); + Set buckets = new HashSet<>(); + for (int bucket = 0; bucket < this.tableSchema.numBuckets(); bucket++) { + if (bucket % numberOfParallelSubtasks == indexOfThisSubtask) { + buckets.add(bucket); + } + } + withBuckets(buckets); } return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index a11e88074932..2e1d913648e7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -69,6 +69,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -331,7 +332,7 @@ public SnapshotReader withBucket(int bucket) { } @Override - public SnapshotReader withBuckets(List buckets) { + public SnapshotReader withBuckets(Collection buckets) { wrapped.withBuckets(buckets); return this; } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index 1d04d2f58f02..07b2fc48e078 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -235,7 +235,8 @@ public void testWithBuckets() throws Exception { int wantedBucket1 = random.nextInt(NUM_BUCKETS); int wantedBucket2 = random.nextInt(NUM_BUCKETS); int wantedBucket3 = random.nextInt(NUM_BUCKETS); - List buckets = Arrays.asList(wantedBucket1, wantedBucket2, wantedBucket3); + Set buckets = + new HashSet<>(Arrays.asList(wantedBucket1, wantedBucket2, wantedBucket3)); FileStoreScan scan = store.newScan(); scan.withSnapshot(snapshot.id()); diff --git a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory b/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory deleted file mode 100644 index 7af6f79b3493..000000000000 --- a/paimon-format/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory +++ /dev/null @@ -1,18 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.paimon.format.avro.AvroFileFormatFactory -org.apache.paimon.format.orc.OrcFileFormatFactory -org.apache.paimon.format.parquet.ParquetFileFormatFactory diff --git a/paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java b/paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java deleted file mode 100644 index cd72bcc53dd4..000000000000 --- a/paimon-format/src/test/java/org/apache/paimon/format/FormatPerformanceTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.format; - -import org.apache.paimon.factories.FactoryUtil; -import org.apache.paimon.options.Options; - -import org.junit.Test; - -import java.util.ServiceLoader; - -import static org.assertj.core.api.Assertions.assertThat; - -/** test format performance. */ -public class FormatPerformanceTest { - - @Test - public void testFormatPerformance() { - double objectCacheCost = newFileFormatWithObjectCache(); - double serviceCost = newFileFormatWithServiceLoader(); - double factoryUtilCost = newFileFormatWithFactoryUtil(); - assertThat(objectCacheCost).isLessThan(serviceCost); - assertThat(objectCacheCost).isLessThan(factoryUtilCost); - } - - private double newFileFormatWithServiceLoader() { - for (int i = 0; i < 1000; i++) { - loadFromIdentifier(); - } - int times = 10_000; - long start = System.nanoTime(); - int nothing = 0; - for (int i = 0; i < times; i++) { - nothing += loadFromIdentifier(); - } - nothing = 0; - return ((double) (System.nanoTime() - start)) / 1000_000 / times + nothing; - } - - private double newFileFormatWithObjectCache() { - for (int i = 0; i < 1000; i++) { - newFileFormat(); - } - int times = 10_000; - long start = System.nanoTime(); - for (int i = 0; i < times; i++) { - newFileFormat(); - } - return ((double) (System.nanoTime() - start)) / 1000_000 / times; - } - - private double newFileFormatWithFactoryUtil() { - for (int i = 0; i < 1000; i++) { - newFileFormatFromFactoryUtil(); - } - int times = 10_000; - long start = System.nanoTime(); - for (int i = 0; i < times; i++) { - newFileFormatFromFactoryUtil(); - } - return ((double) (System.nanoTime() - start)) / 1000_000 / times; - } - - private int loadFromIdentifier() { - ServiceLoader serviceLoader = - ServiceLoader.load(FileFormatFactory.class, FileFormat.class.getClassLoader()); - int i = 0; - for (FileFormatFactory factory : serviceLoader) { - i++; - } - return i; - } - - private FileFormat newFileFormat() { - FileFormat orc = FileFormat.fromIdentifier("orc", new Options()); - return orc; - } - - @Test - public void newFileFormatFromFactoryUtil() { - FileFormatFactory fileFormatFactory = - FactoryUtil.discoverFactory( - FileFormatFactory.class.getClassLoader(), FileFormatFactory.class, "orc"); - } -} From 282a2c99ca34935a3a74f5d3a141e689fa828a36 Mon Sep 17 00:00:00 2001 From: ranxianglei Date: Thu, 14 Nov 2024 06:59:40 +0000 Subject: [PATCH 22/27] [format] id to lowercase --- .../src/main/java/org/apache/paimon/format/FileFormat.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java index 83c929e94cec..9ddb58257324 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java @@ -88,6 +88,9 @@ public static FileFormat fromIdentifier(String identifier, Options options) { /** Create a {@link FileFormat} from format identifier and format options. */ public static FileFormat fromIdentifier(String identifier, FormatContext context) { + if (identifier != null) { + identifier = identifier.toLowerCase(); + } FileFormatFactory fileFormatFactory = FactoryUtil.discoverFactory( FileFormatFactory.class.getClassLoader(), From 884d12f772d7e56807a3a2826a0c8d7080e0dc93 Mon Sep 17 00:00:00 2001 From: ranxianglei Date: Thu, 14 Nov 2024 07:42:34 +0000 Subject: [PATCH 23/27] [tests] core org.apache.paimon.factories.Factory --- .../services/org.apache.paimon.factories.Factory | 1 + .../org.apache.paimon.format.FileFormatFactory | 16 ---------------- 2 files changed, 1 insertion(+), 16 deletions(-) delete mode 100644 paimon-core/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory diff --git a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory index 7eb517ab9835..5e87f5e9ea2e 100644 --- a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. +org.apache.paimon.format.FileStatsExtractingAvroFormatFactory org.apache.paimon.mergetree.compact.aggregate.TestCustomAggFactory \ No newline at end of file diff --git a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory deleted file mode 100644 index 14386c45f21b..000000000000 --- a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.paimon.format.FileStatsExtractingAvroFormatFactory From a1cc9f46ffccfbe886ec11d825310d4410f32b46 Mon Sep 17 00:00:00 2001 From: "ranxianglei.rxl" Date: Thu, 14 Nov 2024 16:38:38 +0800 Subject: [PATCH 24/27] [tests] fileFormat factories add to paimon-flink-common --- .../org.apache.paimon.factories.Factory | 5 +++++ .../org.apache.paimon.format.FileFormatFactory | 18 ------------------ 2 files changed, 5 insertions(+), 18 deletions(-) delete mode 100644 paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 0ff3ac1f1e1c..e5d063979da6 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -80,3 +80,8 @@ org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure org.apache.paimon.flink.procedure.CloneProcedure org.apache.paimon.flink.procedure.CompactManifestProcedure org.apache.paimon.flink.procedure.RefreshObjectTableProcedure + +### fileFormat factories +org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$OrcFactory +org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$ParquetFactory +org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$AvroFactory diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory deleted file mode 100644 index 6e7553d5c668..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.format.FileFormatFactory +++ /dev/null @@ -1,18 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$OrcFactory -org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$ParquetFactory -org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat$AvroFactory From e401844047ac742e8cc2ebd64326735ae3454989 Mon Sep 17 00:00:00 2001 From: ranxianglei Date: Fri, 15 Nov 2024 07:20:44 +0000 Subject: [PATCH 25/27] [core] resolve withBuckets commit --- .../operation/AbstractFileStoreScan.java | 5 ++--- .../table/FallbackReadFileStoreTable.java | 9 +++++++++ .../paimon/table/source/InnerTableScan.java | 3 ++- .../paimon/table/system/AuditLogTable.java | 18 ++++++++++++++++++ 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index deb6fadd7c19..306fd6f5a7c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -440,9 +440,8 @@ private Filter createCacheRowFilter() { } /** - * Read the corresponding entries based on the current required partition and bucket. - * - *

Implemented to {@link InternalRow} is for performance (No deserialization). + * Read the corresponding entries based on the current required bucket, but push down into file + * format . */ private static List createPushDownFilter(Collection buckets) { if (buckets == null || buckets.isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index f1a60b9713f9..663933f337b5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -34,6 +34,7 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.DataTableScan; import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.table.source.TableScan; @@ -44,6 +45,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -266,6 +268,13 @@ public Scan withBucketFilter(Filter bucketFilter) { return this; } + @Override + public InnerTableScan withBuckets(Collection buckets) { + mainScan.withBuckets(buckets); + fallbackScan.withBuckets(buckets); + return this; + } + @Override public Scan withLevelFilter(Filter levelFilter) { mainScan.withLevelFilter(levelFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java index 6c3b28f0f4ce..1e17e001694e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java @@ -54,7 +54,8 @@ default InnerTableScan withBucket(Integer bucket) { } default InnerTableScan withBuckets(Collection buckets) { - throw new RuntimeException("not impl withBuckets for " + this.getClass().getName()); + // return this is not safe for too many class not impl this method and withBucketFilter + return this; } default InnerTableScan withLevelFilter(Filter levelFilter) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index 2e1d913648e7..5105f6fc8fd2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -441,6 +441,12 @@ public InnerTableScan withBucketFilter(Filter bucketFilter) { return this; } + @Override + public InnerTableScan withBuckets(Collection buckets) { + batchScan.withBuckets(buckets); + return this; + } + @Override public InnerTableScan withLevelFilter(Filter levelFilter) { batchScan.withLevelFilter(levelFilter); @@ -478,6 +484,18 @@ public StreamDataTableScan withFilter(Predicate predicate) { return this; } + @Override + public InnerTableScan withBucketFilter(Filter bucketFilter) { + streamScan.withBucketFilter(bucketFilter); + return this; + } + + @Override + public InnerTableScan withBuckets(Collection buckets) { + streamScan.withBuckets(buckets); + return this; + } + @Override public StartingContext startingContext() { return streamScan.startingContext(); From 710af068b40656a151a1b7d4faa91043d49318d7 Mon Sep 17 00:00:00 2001 From: ranxianglei Date: Fri, 15 Nov 2024 07:49:32 +0000 Subject: [PATCH 26/27] [format] no need call rowMapper under getArray --- .../apache/paimon/format/orc/reader/OrcArrayColumnVector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java index 25a1935f3e4b..6aeb8c98892e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcArrayColumnVector.java @@ -42,7 +42,7 @@ public OrcArrayColumnVector( @Override public InternalArray getArray(int i) { - i = rowMapper(i); + // no need call rowMapper(i) here . long offset = hiveVector.offsets[i]; long length = hiveVector.lengths[i]; return new ColumnarArray(paimonVector, (int) offset, (int) length); From 669dc30770f70a4103300eb6df607fd1038a3553 Mon Sep 17 00:00:00 2001 From: ranxianglei Date: Mon, 18 Nov 2024 04:05:50 +0000 Subject: [PATCH 27/27] [core] cancel manifest format factory cache for while . --- .../apache/paimon/manifest/ManifestFile.java | 48 +------------------ 1 file changed, 2 insertions(+), 46 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java index 541cf406472e..41b480e8a427 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java @@ -24,7 +24,6 @@ import org.apache.paimon.format.FormatWriterFactory; import org.apache.paimon.format.SimpleStatsCollector; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.ObjectCacheManager; import org.apache.paimon.fs.Path; import org.apache.paimon.io.RollingFileWriter; import org.apache.paimon.io.SingleFileWriter; @@ -41,9 +40,7 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.time.Duration; import java.util.List; -import java.util.Objects; /** * This file includes several {@link ManifestEntry}s, representing the additional changes since last @@ -204,57 +201,16 @@ public ManifestFile create() { return create(null); } - private static class FormatReaderFactoryKey { - private final RowType entryType; - private final List filters; - private final String formatIdentifier; - - public FormatReaderFactoryKey( - String formatIdentifier, RowType entryType, List filters) { - this.entryType = entryType; - this.filters = filters; - this.formatIdentifier = formatIdentifier; - } - - @Override - public boolean equals(Object o) { - if (o == null || getClass() != o.getClass()) { - return false; - } - FormatReaderFactoryKey that = (FormatReaderFactoryKey) o; - return Objects.equals(entryType, that.entryType) - && Objects.equals(filters, that.filters) - && Objects.equals(formatIdentifier, that.formatIdentifier); - } - - @Override - public int hashCode() { - return Objects.hash(entryType, filters, formatIdentifier); - } - } - - private static final ObjectCacheManager - readers = ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000); - private static final ObjectCacheManager - writers = ObjectCacheManager.newObjectCacheManager(Duration.ofDays(365), 1000); - public ManifestFile create(List filters) { - String formatIdentifier = this.fileFormat.getFormatIdentifier(); RowType entryType = VersionedObjectSerializer.versionType(ManifestEntry.SCHEMA); - FormatReaderFactoryKey formatReaderFactoryKey = - new FormatReaderFactoryKey(formatIdentifier, entryType, filters); return new ManifestFile( fileIO, schemaManager, partitionType, ManifestEntrySerializer.getInstance(), entryType, - readers.get( - formatReaderFactoryKey, - (ignore) -> fileFormat.createReaderFactory(entryType, filters)), - writers.get( - formatReaderFactoryKey, - (ignore) -> fileFormat.createWriterFactory(entryType)), + fileFormat.createReaderFactory(entryType, filters), + fileFormat.createWriterFactory(entryType), compression, pathFactory.manifestFileFactory(), suggestedFileSize,