Skip to content

Commit

Permalink
[cdc] Correct equalsIgnoreFieldId in UpdatedDataFieldsProcessFunction…
Browse files Browse the repository at this point in the history
…Base
  • Loading branch information
JingsongLi committed Dec 18, 2024
1 parent c93040e commit f3813f0
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public boolean equals(Object o) {
}

@Override
public boolean equalsIgnoreFieldId(Object o) {
public boolean equalsIgnoreFieldId(DataType o) {
if (this == o) {
return true;
}
Expand Down
27 changes: 15 additions & 12 deletions paimon-common/src/main/java/org/apache/paimon/types/DataType.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
Expand Down Expand Up @@ -124,15 +122,6 @@ public final DataType copy() {
return copy(isNullable);
}

/**
* Compare two data types without nullable.
*
* @param o the target data type
*/
public boolean equalsIgnoreNullable(@Nonnull DataType o) {
return Objects.equals(this.copy(true), o.copy(true));
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -145,7 +134,21 @@ public boolean equals(Object o) {
return isNullable == that.isNullable && typeRoot == that.typeRoot;
}

public boolean equalsIgnoreFieldId(Object o) {
/**
* Compare two data types without nullable.
*
* @param o the target data type
*/
public boolean equalsIgnoreNullable(DataType o) {
return Objects.equals(this.copy(true), o.copy(true));
}

/**
* Compare two data types without field id.
*
* @param o the target data type
*/
public boolean equalsIgnoreFieldId(DataType o) {
return equals(o);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public boolean equals(Object o) {
}

@Override
public boolean equalsIgnoreFieldId(Object o) {
public boolean equalsIgnoreFieldId(DataType o) {
if (this == o) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ public boolean equals(Object o) {
return fields.equals(rowType.fields);
}

public boolean equalsIgnoreFieldId(Object o) {
@Override
public boolean equalsIgnoreFieldId(DataType o) {
if (this == o) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void processElement(
extractSchemaChanges(schemaManager, actualUpdatedDataFields)) {
applySchemaChange(schemaManager, schemaChange, identifier);
}
/**
/*
* Here, actualUpdatedDataFields cannot be used to update latestFields because there is a
* non-SchemaChange.AddColumn scenario. Otherwise, the previously existing fields cannot be
* modified again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,11 @@ protected List<SchemaChange> extractSchemaChanges(
String newFieldName = StringUtils.toLowerCaseIfNeed(newField.name(), caseSensitive);
if (oldFields.containsKey(newFieldName)) {
DataField oldField = oldFields.get(newFieldName);
// we compare by ignoring nullable, because partition keys and primary keys might be
// nullable in source database, but they can't be null in Paimon
if (oldField.type().equalsIgnoreNullable(newField.type())) {
// 1. we compare by ignoring nullable, because partition keys and primary keys might
// be nullable in source database, but they can't be null in Paimon
// 2. we compare by ignoring field id, the field ID is newly created and may be
// different, we should ignore it
if (oldField.type().copy(true).equalsIgnoreFieldId(newField.type().copy(true))) {
// update column comment
if (newField.description() != null
&& !newField.description().equals(oldField.description())) {
Expand Down

0 comments on commit f3813f0

Please sign in to comment.