Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core]Support ignoring specified fields while generating -U, +U changelog for the same record #3

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
<td>Boolean</td>
<td>Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction.</td>
</tr>
<tr>
<td><h5>changelog-producer.row-deduplicate-ignore-fields</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true.</td>
</tr>
<tr>
<td><h5>changelog.num-retained.max</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class EqualiserCodeGenerator(fieldTypes: Array[DataType], fields: Array[Int]) {
val ctx = new CodeGeneratorContext
val className = newName(name)

val containsIgnoreFields = fieldTypes.length > fields.length
val equalsMethodCodes = for (idx <- fields) yield generateEqualsMethod(ctx, idx)
val equalsMethodCalls = for (idx <- fields) yield {
val methodName = getEqualsMethodName(idx)
Expand All @@ -57,7 +58,7 @@ class EqualiserCodeGenerator(fieldTypes: Array[DataType], fields: Array[Int]) {

@Override
public boolean equals($ROW_DATA $LEFT_INPUT, $ROW_DATA $RIGHT_INPUT) {
if ($LEFT_INPUT instanceof $BINARY_ROW && $RIGHT_INPUT instanceof $BINARY_ROW) {
if ($LEFT_INPUT instanceof $BINARY_ROW && $RIGHT_INPUT instanceof $BINARY_ROW && !$containsIgnoreFields) {
return $LEFT_INPUT.equals($RIGHT_INPUT);
}

Expand Down
13 changes: 13 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,13 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction.");

public static final ConfigOption<String> CHANGELOG_PRODUCER_ROW_DEDUPLICATE_IGNORE_FIELDS =
key("changelog-producer.row-deduplicate-ignore-fields")
.stringType()
.noDefaultValue()
.withDescription(
"Fields that are ignored for comparison while generating -U, +U changelog for the same record. This configuration is only valid for the changelog-producer.row-deduplicate is true.");

@Immutable
public static final ConfigOption<String> SEQUENCE_FIELD =
key("sequence.field")
Expand Down Expand Up @@ -1776,6 +1783,12 @@ public boolean changelogRowDeduplicate() {
return options.get(CHANGELOG_PRODUCER_ROW_DEDUPLICATE);
}

public List<String> changelogRowDeduplicateIgnoreFields() {
return options.getOptional(CHANGELOG_PRODUCER_ROW_DEDUPLICATE_IGNORE_FIELDS)
.map(s -> Arrays.asList(s.split(",")))
.orElse(Collections.emptyList());
}

public boolean scanPlanSortPartition() {
return options.get(SCAN_PLAN_SORT_PARTITION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
private final RowType valueType;
private final KeyValueFieldsExtractor keyValueFieldsExtractor;
private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
private final Supplier<RecordEqualiser> valueEqualiserSupplier;
private final Supplier<RecordEqualiser> logDedupEqualSupplier;
private final MergeFunctionFactory<KeyValue> mfFactory;
private final String tableName;

Expand All @@ -93,7 +93,11 @@ public KeyValueFileStore(
this.keyValueFieldsExtractor = keyValueFieldsExtractor;
this.mfFactory = mfFactory;
this.keyComparatorSupplier = new KeyComparatorSupplier(keyType);
this.valueEqualiserSupplier = new ValueEqualiserSupplier(valueType);
List<String> ignoreFields = options.changelogRowDeduplicateIgnoreFields();
this.logDedupEqualSupplier =
options.changelogRowDeduplicate() && !ignoreFields.isEmpty()
? ValueEqualiserSupplier.fromIgnoreFields(valueType, ignoreFields)
: new ValueEqualiserSupplier(valueType);
this.tableName = tableName;
}

Expand Down Expand Up @@ -174,7 +178,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
valueType,
keyComparatorSupplier,
() -> UserDefinedSeqComparator.create(valueType, options),
valueEqualiserSupplier,
logDedupEqualSupplier,
mfFactory,
pathFactory(),
format2PathFactory(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import java.util.List;
import java.util.function.Supplier;
import java.util.stream.IntStream;

import static org.apache.paimon.codegen.CodeGenUtils.newRecordEqualiser;

Expand All @@ -34,12 +37,36 @@ public class ValueEqualiserSupplier implements SerializableSupplier<RecordEquali

private final List<DataType> fieldTypes;

private final int[] projection;

public ValueEqualiserSupplier(RowType keyType) {
this.fieldTypes = keyType.getFieldTypes();
this.projection = null;
}

public ValueEqualiserSupplier(RowType keyType, int[] projection) {
this.fieldTypes = keyType.getFieldTypes();
this.projection = projection;
}

@Override
public RecordEqualiser get() {
return newRecordEqualiser(fieldTypes);
return this.projection == null
? newRecordEqualiser(fieldTypes)
: newRecordEqualiser(fieldTypes, projection);
}

public static ValueEqualiserSupplier fromIgnoreFields(
RowType rowType, @Nullable List<String> ignoreFields) {
int[] projection = getProjectionWithIgnoreFields(rowType, ignoreFields);
return new ValueEqualiserSupplier(rowType, projection);
}

private static int[] getProjectionWithIgnoreFields(RowType rowType, List<String> ignoreFields) {
List<String> fieldNames = rowType.getFieldNames();
IntStream projectionStream = IntStream.range(0, rowType.getFieldCount());
return projectionStream
.filter(idx -> !ignoreFields.contains(fieldNames.get(idx)))
.toArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@

import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ValueEqualiserSupplier;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -31,6 +35,7 @@
import java.util.List;

import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link FullChangelogMergeFunctionWrapper}. */
public abstract class FullChangelogMergeFunctionWrapperTestBase {
Expand Down Expand Up @@ -214,5 +219,63 @@ public static class WithChangelogRowDeduplicateMergeFunctionTest
protected boolean changelogRowDeduplicate() {
return true;
}

@Test
public void testFullChangelogMergeFunctionWrapperWithIgnoreFields() {
RowType valueType =
RowType.builder()
.fields(
new DataType[] {DataTypes.INT(), DataTypes.INT()},
new String[] {"f0", "f1"})
.build();
List<String> ignoreFields = Collections.singletonList("f1");
ValueEqualiserSupplier logDedupEqualSupplier =
ValueEqualiserSupplier.fromIgnoreFields(valueType, ignoreFields);
FullChangelogMergeFunctionWrapper function =
new FullChangelogMergeFunctionWrapper(
createMergeFunction(), MAX_LEVEL, logDedupEqualSupplier.get(), true);

// With level-0 'insert' record, with max level same record. Notice that the specified
// ignored
// fields in records are different.
function.reset();
function.add(
new KeyValue()
.replace(row(1), 1, RowKind.INSERT, row(1, 1))
.setLevel(MAX_LEVEL));
function.add(new KeyValue().replace(row(1), 2, RowKind.INSERT, row(1, 2)).setLevel(0));
ChangelogResult result = function.getResult();
assertThat(result).isNotNull();
List<KeyValue> changelogs = result.changelogs();
assertThat(changelogs).isEmpty();
KeyValue kv = result.result();
assertThat(kv).isNotNull();
assertThat(kv.valueKind()).isEqualTo(RowKind.INSERT);
assertThat(kv.value().getInt(0)).isEqualTo(1);
assertThat(kv.value().getInt(1)).isEqualTo(2);

// With level-0 'insert' record, with max level different record.
function.reset();
function.add(
new KeyValue()
.replace(row(1), 1, RowKind.INSERT, row(1, 1))
.setLevel(MAX_LEVEL));
function.add(new KeyValue().replace(row(1), 2, RowKind.INSERT, row(2, 2)).setLevel(0));
result = function.getResult();
assertThat(result).isNotNull();
changelogs = result.changelogs();
assertThat(changelogs).hasSize(2);
assertThat(changelogs.get(0).valueKind()).isEqualTo(RowKind.UPDATE_BEFORE);
assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(1);
assertThat(changelogs.get(0).value().getInt(1)).isEqualTo(1);
assertThat(changelogs.get(1).valueKind()).isEqualTo(RowKind.UPDATE_AFTER);
assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(2);
assertThat(changelogs.get(1).value().getInt(1)).isEqualTo(2);
kv = result.result();
assertThat(kv).isNotNull();
assertThat(kv.valueKind()).isEqualTo(RowKind.INSERT);
assertThat(kv.value().getInt(0)).isEqualTo(2);
assertThat(kv.value().getInt(1)).isEqualTo(2);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
import org.apache.paimon.mergetree.compact.aggregate.FieldLastValueAgg;
import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.UserDefinedSeqComparator;
import org.apache.paimon.utils.ValueEqualiserSupplier;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;

Expand All @@ -41,6 +43,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -214,6 +217,72 @@ public void testDeduplicate(boolean changelogRowDeduplicate) {
assertThat(kv.value().getInt(0)).isEqualTo(2);
}

@Test
public void testDeduplicateWithIgnoreFields() {
Map<InternalRow, KeyValue> highLevel = new HashMap<>();
RowType valueType =
RowType.builder()
.fields(
new DataType[] {DataTypes.INT(), DataTypes.INT()},
new String[] {"f0", "f1"})
.build();
UserDefinedSeqComparator userDefinedSeqComparator =
UserDefinedSeqComparator.create(
valueType, CoreOptions.fromMap(ImmutableMap.of("sequence.field", "f1")));
assert userDefinedSeqComparator != null;
List<String> ignoreFields = Collections.singletonList("f1");
ValueEqualiserSupplier logDedupEqualSupplier =
ValueEqualiserSupplier.fromIgnoreFields(valueType, ignoreFields);
LookupChangelogMergeFunctionWrapper function =
new LookupChangelogMergeFunctionWrapper(
LookupMergeFunction.wrap(
DeduplicateMergeFunction.factory(),
RowType.of(DataTypes.INT()),
valueType),
highLevel::get,
logDedupEqualSupplier.get(),
true,
LookupStrategy.from(false, true, false, false),
null,
userDefinedSeqComparator);

// With level-0 'insert' record, with level-x (x > 0) same record. Notice that the specified
// ignored
// fields in records are different.
function.reset();
function.add(new KeyValue().replace(row(1), 1, INSERT, row(1, 1)).setLevel(2));
function.add(new KeyValue().replace(row(1), 2, INSERT, row(1, 2)).setLevel(0));
ChangelogResult result = function.getResult();
assertThat(result).isNotNull();
List<KeyValue> changelogs = result.changelogs();
assertThat(changelogs).isEmpty();
KeyValue kv = result.result();
assertThat(kv).isNotNull();
assertThat(kv.valueKind()).isEqualTo(INSERT);
assertThat(kv.value().getInt(0)).isEqualTo(1);
assertThat(kv.value().getInt(1)).isEqualTo(2);

// With level-0 'insert' record, with level-x (x > 0) different record.
function.reset();
function.add(new KeyValue().replace(row(1), 1, INSERT, row(1, 1)).setLevel(1));
function.add(new KeyValue().replace(row(1), 2, INSERT, row(2, 2)).setLevel(0));
result = function.getResult();
assertThat(result).isNotNull();
changelogs = result.changelogs();
assertThat(changelogs).hasSize(2);
assertThat(changelogs.get(0).valueKind()).isEqualTo(UPDATE_BEFORE);
assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(1);
assertThat(changelogs.get(0).value().getInt(1)).isEqualTo(1);
assertThat(changelogs.get(1).valueKind()).isEqualTo(UPDATE_AFTER);
assertThat(changelogs.get(1).value().getInt(0)).isEqualTo(2);
assertThat(changelogs.get(1).value().getInt(1)).isEqualTo(2);
kv = result.result();
assertThat(kv).isNotNull();
assertThat(kv.valueKind()).isEqualTo(INSERT);
assertThat(kv.value().getInt(0)).isEqualTo(2);
assertThat(kv.value().getInt(1)).isEqualTo(2);
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testSum(boolean changelogRowDeduplicate) {
Expand Down
Loading