Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[paimon-flink-cdc] Add the latest_schema state at schema evolution operator ,Reduce the latest schema access frequency #4535

Merged
merged 6 commits into from
Nov 28, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,20 @@
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;

import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
* A {@link ProcessFunction} to handle schema changes. New schema is represented by a list of {@link
Expand All @@ -37,25 +46,65 @@
* be 1.
*/
public class UpdatedDataFieldsProcessFunction
extends UpdatedDataFieldsProcessFunctionBase<List<DataField>, Void> {
extends UpdatedDataFieldsProcessFunctionBase<List<DataField>, Void>
implements CheckpointedFunction {
Copy link
Contributor

@JingsongLi JingsongLi Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't need to be CheckpointedFunction


private final SchemaManager schemaManager;

private final Identifier identifier;

private ListState<DataField> latestSchemaListState;
private List<DataField> latestSchemaList;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latestSchemaListState cannot be set to final because it cannot be initialized in the constructor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean latestSchemaList 'latestSchemaList' may be 'final', not latestSchemaListState.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok,thanks


public UpdatedDataFieldsProcessFunction(
SchemaManager schemaManager, Identifier identifier, Catalog.Loader catalogLoader) {
super(catalogLoader);
this.schemaManager = schemaManager;
this.identifier = identifier;
this.latestSchemaList = new ArrayList<>();
}

@Override
public void processElement(
List<DataField> updatedDataFields, Context context, Collector<Void> collector)
throws Exception {
for (SchemaChange schemaChange : extractSchemaChanges(schemaManager, updatedDataFields)) {
List<DataField> actualUpdatedDataFields =
updatedDataFields.stream()
.filter(dataField -> !dataFieldContainIgnoreId(dataField))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(actualUpdatedDataFields)) {
return;
}
for (SchemaChange schemaChange :
extractSchemaChanges(schemaManager, actualUpdatedDataFields)) {
applySchemaChange(schemaManager, schemaChange, identifier);
}
actualUpdatedDataFields.forEach(field -> latestSchemaList.add(field));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latestSchemaList.addAll(actualUpdatedDataFields);

}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a ut

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added: SchemaEvolutionTest

public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
latestSchemaListState.clear();
latestSchemaListState.update(latestSchemaList);
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
latestSchemaListState =
context.getOperatorStateStore()
.getListState(
new ListStateDescriptor<>(
"latest-schema-list-state", DataField.class));
if (context.isRestored()) {
latestSchemaListState.get().forEach(dataField -> latestSchemaList.add(dataField));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latestSchemaListState.get().forEach(latestSchemaList::add);

} else {
RowType oldRowType = schemaManager.latest().get().logicalRowType();
oldRowType.getFields().forEach(dataField -> latestSchemaList.add(dataField));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latestSchemaList.addAll(oldRowType.getFields());

}
}

private boolean dataFieldContainIgnoreId(DataField dataField) {
return latestSchemaList.stream()
Copy link
Contributor

@JingsongLi JingsongLi Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (latestSchemaList == null) {
     RowType rowType = schemaManager.latest().get().logicalRowType();
     latestSchemaList.addAll(rowType.getFields());
}

.anyMatch(previous -> DataField.dataFieldEqualsIgnoreId(previous, dataField));
}
}
Loading