Skip to content

Commit

Permalink
[core]Support ignoring specified fields while generating -U, +U chang…
Browse files Browse the repository at this point in the history
…elog for the same record

Change-Id: I9afa7ba916513c65f7e708ff9a98d8ac85ea755a
  • Loading branch information
chenxinwei committed Sep 10, 2024
1 parent 588b6e0 commit 026bda7
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 5 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
<td>Boolean</td>
<td>Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction.</td>
</tr>
<tr>
<td><h5>changelog-producer.row-deduplicate-ignore-fields</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true.</td>
</tr>
<tr>
<td><h5>changelog.num-retained.max</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
13 changes: 13 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,13 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction.");

public static final ConfigOption<String> CHANGELOG_PRODUCER_ROW_DEDUPLICATE_IGNORE_FIELDS =
key("changelog-producer.row-deduplicate-ignore-fields")
.stringType()
.noDefaultValue()
.withDescription(
"Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true.");

@Immutable
public static final ConfigOption<String> SEQUENCE_FIELD =
key("sequence.field")
Expand Down Expand Up @@ -1776,6 +1783,12 @@ public boolean changelogRowDeduplicate() {
return options.get(CHANGELOG_PRODUCER_ROW_DEDUPLICATE);
}

public List<String> changelogRowDeduplicateIgnoreSequenceField() {
return options.getOptional(CHANGELOG_PRODUCER_ROW_DEDUPLICATE_IGNORE_FIELDS)
.map(s -> Arrays.asList(s.split(",")))
.orElse(Collections.emptyList());
}

public boolean scanPlanSortPartition() {
return options.get(SCAN_PLAN_SORT_PARTITION);
}
Expand Down
19 changes: 17 additions & 2 deletions paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogDeduplicateEqualiserSupplier;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.UserDefinedSeqComparator;
import org.apache.paimon.utils.ValueEqualiserSupplier;

import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -52,6 +52,7 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.IntStream;

import static org.apache.paimon.predicate.PredicateBuilder.and;
import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
Expand Down Expand Up @@ -93,7 +94,13 @@ public KeyValueFileStore(
this.keyValueFieldsExtractor = keyValueFieldsExtractor;
this.mfFactory = mfFactory;
this.keyComparatorSupplier = new KeyComparatorSupplier(keyType);
this.valueEqualiserSupplier = new ValueEqualiserSupplier(valueType);
List<String> ignoreFields = options.changelogRowDeduplicateIgnoreSequenceField();
int[] projection =
options.changelogRowDeduplicate() && !ignoreFields.isEmpty()
? getProjectionWithIgnoreFields(valueType, ignoreFields)
: null;
this.valueEqualiserSupplier =
new ChangelogDeduplicateEqualiserSupplier(valueType, projection);
this.tableName = tableName;
}

Expand Down Expand Up @@ -243,4 +250,12 @@ public void pushdown(Predicate keyFilter) {
public Comparator<InternalRow> newKeyComparator() {
return keyComparatorSupplier.get();
}

private int[] getProjectionWithIgnoreFields(RowType rowType, List<String> ignoreFields) {
List<String> fieldNames = rowType.getFieldNames();
IntStream projectionStream = IntStream.range(0, rowType.getFieldCount());
return projectionStream
.filter(idx -> !ignoreFields.contains(fieldNames.get(idx)))
.toArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,29 @@
import static org.apache.paimon.codegen.CodeGenUtils.newRecordEqualiser;

/** A {@link Supplier} that returns the equaliser for the file store value. */
public class ValueEqualiserSupplier implements SerializableSupplier<RecordEqualiser> {
public class ChangelogDeduplicateEqualiserSupplier
implements SerializableSupplier<RecordEqualiser> {

private static final long serialVersionUID = 1L;

private final List<DataType> fieldTypes;

public ValueEqualiserSupplier(RowType keyType) {
private final int[] projection;

public ChangelogDeduplicateEqualiserSupplier(RowType keyType) {
this.fieldTypes = keyType.getFieldTypes();
this.projection = null;
}

public ChangelogDeduplicateEqualiserSupplier(RowType keyType, int[] projection) {
this.fieldTypes = keyType.getFieldTypes();
this.projection = projection;
}

@Override
public RecordEqualiser get() {
return newRecordEqualiser(fieldTypes);
return this.projection == null
? newRecordEqualiser(fieldTypes)
: newRecordEqualiser(fieldTypes, projection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
import org.apache.paimon.mergetree.compact.aggregate.FieldLastValueAgg;
import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChangelogDeduplicateEqualiserSupplier;
import org.apache.paimon.utils.UserDefinedSeqComparator;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
Expand All @@ -41,11 +43,13 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.IntStream;

import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.apache.paimon.types.RowKind.DELETE;
Expand Down Expand Up @@ -214,6 +218,77 @@ public void testDeduplicate(boolean changelogRowDeduplicate) {
assertThat(kv.value().getInt(0)).isEqualTo(2);
}

@Test
public void testDeduplicateIgnoreSequenceField() {
Map<InternalRow, KeyValue> highLevel = new HashMap<>();
RowType valueType =
RowType.builder()
.fields(
new DataType[] {DataTypes.INT(), DataTypes.INT()},
new String[] {"f0", "f1"})
.build();
UserDefinedSeqComparator userDefinedSeqComparator =
UserDefinedSeqComparator.create(
valueType, CoreOptions.fromMap(ImmutableMap.of("sequence.field", "f1")));
assert userDefinedSeqComparator != null;
List<String> ignoreFields = Collections.singletonList("f1");
List<String> fieldNames = valueType.getFieldNames();
IntStream projectionStream = IntStream.range(0, valueType.getFieldCount());
int[] projection =
projectionStream
.filter(idx -> !ignoreFields.contains(fieldNames.get(idx)))
.toArray();
ChangelogDeduplicateEqualiserSupplier changelogDeduplicateEqualiserSupplier =
new ChangelogDeduplicateEqualiserSupplier(valueType, projection);
LookupChangelogMergeFunctionWrapper function =
new LookupChangelogMergeFunctionWrapper(
LookupMergeFunction.wrap(
DeduplicateMergeFunction.factory(),
RowType.of(DataTypes.INT()),
valueType),
highLevel::get,
changelogDeduplicateEqualiserSupplier.get(),
true,
LookupStrategy.from(false, true, false, false),
null,
userDefinedSeqComparator);

// With level-0 'insert' record, with level-x (x > 0) same record. Notice that sequence
// fields in records are different.
function.reset();
function.add(new KeyValue().replace(row(1), 1, INSERT, row(1, 1)).setLevel(2));
function.add(new KeyValue().replace(row(1), 2, INSERT, row(1, 2)).setLevel(0));
ChangelogResult result = function.getResult();
assertThat(result).isNotNull();
List<KeyValue> changelogs = result.changelogs();
assertThat(changelogs).isEmpty();
KeyValue kv = result.result();
assertThat(kv).isNotNull();
assertThat(kv.valueKind()).isEqualTo(INSERT);
assertThat(kv.value().getInt(0)).isEqualTo(1);
assertThat(kv.value().getInt(1)).isEqualTo(2);

// With level-0 'insert' record, with level-x (x > 0) different record.
function.reset();
function.add(new KeyValue().replace(row(1), 1, INSERT, row(1, 1)).setLevel(1));
function.add(new KeyValue().replace(row(1), 2, INSERT, row(2, 2)).setLevel(0));
result = function.getResult();
assertThat(result).isNotNull();
changelogs = result.changelogs();
assertThat(changelogs).hasSize(2);
assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(1);
assertThat(changelogs.get(0).value().getInt(1)).isEqualTo(1);
assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(2);
assertThat(changelogs.get(1).value().getInt(1)).isEqualTo(2);
kv = result.result();
assertThat(kv).isNotNull();
assertThat(kv.valueKind()).isEqualTo(INSERT);
assertThat(kv.value().getInt(0)).isEqualTo(2);
assertThat(kv.value().getInt(1)).isEqualTo(2);
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testSum(boolean changelogRowDeduplicate) {
Expand Down

0 comments on commit 026bda7

Please sign in to comment.