Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1729292 modify iceberg tree based on record data #1007

Conversation

sfc-gh-bzabek
Copy link
Contributor

@sfc-gh-bzabek sfc-gh-bzabek commented Nov 21, 2024

Overview

SNOW-1729292

There is still a lot work to be done. Tell me if you find this approach good.

  1. Ingest-sdk returns list of columnsToEvolve inside an InsertError
  2. get schemas from channel for these columns
  3. get column schemas from record
  4. resolve which column is new
  • if column from record is not present in channels schema than it's new
  • add this column
  1. resolve which column is modified
  • if the column is present in a channel it means it's not new - hence it's to be modified
  • merge column from channel with a column from record
  • set data type

The whole logic responsible for iceberg schema evolution is package private. Only IcebergSchemaEvolutionService is public.

It doesn't work with ingest-sdk 3.0.0, local jar needs to be build from ingest-sdk master.

TODO:

  • I didn't inspect if columns nullability evolution is handled
  • column comments are not handled
  • tests for potential SQL injection (I think we need another Jira for it)

Pre-review checklist

  • This change should be part of a Behavior Change Release. See go/behavior-change.
  • This change has passed Merge gate tests
  • Snowpipe Changes
  • Snowpipe Streaming Changes
  • This change is TEST-ONLY
  • This change is README/Javadocs only
  • This change is protected by a config parameter <PARAMETER_NAME> eg snowflake.ingestion.method.
    • Yes - Added end to end and Unit Tests.
    • No - Suggest why it is not param protected
  • Is his change protected by parameter <PARAMETER_NAME> on the server side?
    • The parameter/feature is not yet active in production (partial rollout or PrPr, see Changes for Unreleased Features and Fixes).
    • If there is an issue, it can be safely mitigated by turning the parameter off. This is also verified by a test (See go/ppp).


public IcebergSchemaEvolutionService(SnowflakeConnectionService conn) {
this.conn = conn;
this.tableSchemaResolver = new IcebergTableSchemaResolver();
this.icebergTableSchemaResolver = new IcebergTableSchemaResolver();
}

@VisibleForTesting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it is not used anymore. You can safely delete it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused. I use icebergTableSchemaResolver a couple of times. (?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the constructor below marked with @VisibleForTesting (not affected by your changes).

import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class ParseIcebergColumnTreeTest {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was moved to a correct module.

@sfc-gh-bzabek sfc-gh-bzabek marked this pull request as ready for review November 29, 2024 12:00
@sfc-gh-bzabek sfc-gh-bzabek requested a review from a team as a code owner November 29, 2024 12:00
return new IcebergColumnTree(rootNode);
}

IcebergColumnTree fromConnectSchema(Field kafkaConnectField) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have two separated flows: JSON without schema and AVRO/Protobuf.

I would extract fromConnectSchema to a separate class SchematizedIcebergColumnTreeFactory and rename this class to NoSchemaIcebergColumnTreeFactory. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, so what we have:

  • from Iceberg (channel) schema - it's used before both workflows
  • from json payload - workflow without schema
  • from record schema - now there is only fromConnectSchema method. Do we have to also write seperate logic to parse avro and protobuf schema? I think not because Converter will parse it into connect schema when we ancounter AVRO or Protobuf. (For sure we ust test it)

When I wrote the factory, I thought it will be a bit over engineering to split it. However if we are going to need more methods then sure. Logically we have 3 parts.

List<IcebergColumnTree> modifiedOrAddedColumns =
icebergTableSchemaResolver.resolveIcebergSchemaFromRecord(record, columnsToEvolve);

List<IcebergColumnTree> columnsToAdd =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to iterate twice over the list. The columns from modifiedOrAddedColumns are either modified or added, right?

Copy link
Contributor Author

@sfc-gh-bzabek sfc-gh-bzabek Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Righ, however I found my approach simpler and didn't care very much about performance of that.

Create 2 lists and write and if to put an element into one or another list.

  private Pair<List<IcebergColumnTree>, List<IcebergColumnTree>> distinguish(
          List<IcebergColumnTree> alreadyExistingColumns,
          List<IcebergColumnTree> modifiedOrAddedColumns) {

    ArrayList<IcebergColumnTree> columnsToModify = new ArrayList<>();
    ArrayList<IcebergColumnTree> columnsToAdd = new ArrayList<>();
    
    for (IcebergColumnTree tree : modifiedOrAddedColumns) {
      if (alreadyExistingColumns.stream()
              .anyMatch(alreadyExisting -> alreadyExisting.getColumnName()
                      .equalsIgnoreCase(tree.getColumnName()))) {
        
        columnsToModify.add(tree);
      } else {
        columnsToAdd.add(tree);
      }
    }
    return Pair<List<IcebergColumnTree>, List<IcebergColumnTree>>(columnsToAdd, columnsToModify);
  }

But there is not "Pair" in JDK8.
This approach is a bit more messy. Slightly more performant.

(It's messy ->) The second alternative I see is to distinguish columnsToAdd, and then having columnsToAdd iterate over columns to evolve and again match them with modifiedOrAddedColumns list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None is perfect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me it's a matter of avoiding code duplication rather than optimizing the code execution time. I don't see any problem with creating a wrapper class for two lists.
Anyway it's not a blocker to me.

.filter(
modifiedOrAddedColumn ->
alreadyExistingColumns.stream()
.noneMatch(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be easier to convert alreadyExistingColumns from List to Set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can use Set everywhere instead of a list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But for IcebergColumnTree I didn't implement equals nor hashCode. I don't feel it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Is there any problem with implementing these methods or the final code doesn't look cleaner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we implement only for columnName - there is not a problem. Using it for both doesn't make sense.
I don't think using Set will change anything in a logic. It may (shouldn't) sneakily replace a column somewhere... I can change it, never say never.

+ "}";
}

static String nestedObjectsPayload =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be private

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it consise with others. I don't want to restrict it's usage.

false));
}

private static final String RECORD_METADATA_TYPE =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move it at the beginning of the class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@sfc-gh-mbobowski sfc-gh-mbobowski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still see some space for minor improvements, but I am ok with merging it at this point.

@@ -51,6 +49,8 @@ public void setUp() {
config.put(ICEBERG_ENABLED, "TRUE");
config.put(ENABLE_SCHEMATIZATION_CONFIG, isSchemaEvolutionEnabled().toString());
config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "true");
// "snowflake.streaming.max.client.lag" = 1 second, for faster tests
config.put(SNOWPIPE_STREAMING_MAX_CLIENT_LAG, "1");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

Copy link
Contributor

@sfc-gh-wtrefon sfc-gh-wtrefon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking much better now, really like how you split the IcebergFieldNode class. Good job!

@sfc-gh-bzabek sfc-gh-bzabek merged commit 85db567 into master Dec 2, 2024
53 of 54 checks passed
@sfc-gh-bzabek sfc-gh-bzabek deleted the bzabek-SNOW-1729292-modify-iceberg-tree-based-on-record-data branch December 2, 2024 14:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants