Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[paimon-flink-cdc] Add the latest_schema state at schema evolution operator ,Reduce the latest schema access frequency #4535

Merged
merged 6 commits into from
Nov 28, 2024

Conversation

GangYang-HX
Copy link
Contributor

@GangYang-HX GangYang-HX commented Nov 15, 2024

Purpose

Linked issue: Issue-4521

In scenarios where the number of Paimon table fields is large and the Write concurrency is high, reduce the Latest-Schema access frequency to improve the throughput of job cold start

Tests

Case-1: Observe whether the checkpoint time of schema evolution changes
image
image
image
Conclusion: After optimization, Schema Evolution is basically completed in seconds, or even milliseconds.

Case-2: Observe the log to see if there are still a large number of read schema behaviors
image
Conclusion: From hundreds of thousands to 115 times

API and Format

org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction#processElement

Documentation

Before the Schema Evolution operator calls org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunctionBase#extractSchemaChanges, add a judgment to confirm whether the field update really needs to be triggered.

  1. Add a List variable to determine whether it is an updated column: List latestSchemaList
  2. Add a state ListState. When the task is restored from the state, it is directly restored from here: ListState latestSchemaListState


private final SchemaManager schemaManager;

private final Identifier identifier;

private ListState<DataField> latestSchemaListState;
private List<DataField> latestSchemaList;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latestSchemaListState cannot be set to final because it cannot be initialized in the constructor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean latestSchemaList 'latestSchemaList' may be 'final', not latestSchemaListState.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok,thanks

actualUpdatedDataFields.forEach(field -> latestSchemaList.add(field));
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a ut

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added: SchemaEvolutionTest

latestSchemaListState.get().forEach(dataField -> latestSchemaList.add(dataField));
} else {
RowType oldRowType = schemaManager.latest().get().logicalRowType();
oldRowType.getFields().forEach(dataField -> latestSchemaList.add(dataField));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latestSchemaList.addAll(oldRowType.getFields());

applySchemaChange(schemaManager, schemaChange, identifier);
}
actualUpdatedDataFields.forEach(field -> latestSchemaList.add(field));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latestSchemaList.addAll(actualUpdatedDataFields);

new ListStateDescriptor<>(
"latest-schema-list-state", DataField.class));
if (context.isRestored()) {
latestSchemaListState.get().forEach(dataField -> latestSchemaList.add(dataField));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latestSchemaListState.get().forEach(latestSchemaList::add);

@JingsongLi
Copy link
Contributor

Hi @GangYang-HX , do we really need to put the schema to the state? Maybe just a field in memory is OK?

@GangYang-HX
Copy link
Contributor Author

Hi @GangYang-HX , do we really need to put the schema to the state? Maybe just a field in memory is OK?

Well, I have considered it before. If the state is not added when restoring from the state, there will be one more latest-schame access.
In fact, one more access should not have much impact. I will adjust it later.

@@ -37,25 +44,54 @@
* be 1.
*/
public class UpdatedDataFieldsProcessFunction
extends UpdatedDataFieldsProcessFunctionBase<List<DataField>, Void> {
extends UpdatedDataFieldsProcessFunctionBase<List<DataField>, Void>
implements CheckpointedFunction {
Copy link
Contributor

@JingsongLi JingsongLi Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't need to be CheckpointedFunction


private final SchemaManager schemaManager;

private final Identifier identifier;

private final List<DataField> latestSchemaList;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nullable
private List latestSchemaList

}

private boolean dataFieldContainIgnoreId(DataField dataField) {
return latestSchemaList.stream()
Copy link
Contributor

@JingsongLi JingsongLi Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (latestSchemaList == null) {
     RowType rowType = schemaManager.latest().get().logicalRowType();
     latestSchemaList.addAll(rowType.getFields());
}


private final SchemaManager schemaManager;

private final Identifier identifier;

private final List<DataField> latestSchemaList;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it can be:

Set<FieldIdentifier> latestFields;

FieldIdentifier {
    String name;
    DataType type;
    String description;
}

* non-SchemaChange.AddColumn scenario. Otherwise, the previously existing fields cannot be
* modified again.
*/
updateLatestFields();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just add updatedDataFields to latestFields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is the logic in previous versions, but there are risks in actual testing.
Reason: FieldIdentifier only guarantees the uniqueness of <name, type, description>. If any of the attributes is adjusted repeatedly, it will lead to misjudgment.
For example: Field A's type is int, and then it is changed to string. If you want to adjust it to int again, it will not work,because latestFields has already saved an element like <A, int, description>.

@JingsongLi
Copy link
Contributor

+1

@JingsongLi JingsongLi merged commit 2f93b7b into apache:master Nov 28, 2024
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants