Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1694636: Schematization utils refactor (PART 3) - Refactor schema resolver #944

Merged

Conversation

sfc-gh-wtrefon
Copy link
Contributor

Overview

SNOW-1694636

Pre-review checklist

  • This change should be part of a Behavior Change Release. See go/behavior-change.
  • This change has passed Merge gate tests
  • Snowpipe Changes
  • Snowpipe Streaming Changes
  • This change is TEST-ONLY
  • This change is README/Javadocs only
  • This change is protected by a config parameter <PARAMETER_NAME> eg snowflake.ingestion.method.
    • Yes - Added end to end and Unit Tests.
    • No - Suggest why it is not param protected
  • Is his change protected by parameter <PARAMETER_NAME> on the server side?
    • The parameter/feature is not yet active in production (partial rollout or PrPr, see Changes for Unreleased Features and Fixes).
    • If there is an issue, it can be safely mitigated by turning the parameter off. This is also verified by a test (See go/ppp).

@sfc-gh-wtrefon sfc-gh-wtrefon requested a review from a team as a code owner October 3, 2024 14:20
Comment on lines 60 to 99
private TableSchema getTableSchemaFromRecordSchema(
JsonNode recordNode, Set<String> columnNamesSet, SinkRecord record) {
Map<String, ColumnInfos> schemaMap = getFullSchemaMapFromRecord(record);
Map<String, ColumnInfos> columnsInferredFromSchema =
Streams.stream(recordNode.fields())
.map(ColumnValuePair::of)
.filter(pair -> columnNamesSet.contains(pair.getQuotedColumnName()))
.peek(
field -> {
if (!schemaMap.containsKey(field.getColumnName())) {
// only when the type of the value is unrecognizable for JAVA
throw SnowflakeErrors.ERROR_5022.getException(
"column: " + field.getColumnName() + " schemaMap: " + schemaMap);
}
})
.map(
field ->
Maps.immutableEntry(
Utils.quoteNameIfNeeded(field.getQuotedColumnName()),
schemaMap.get(field.getColumnName())))
.collect(
Collectors.toMap(
Map.Entry::getKey, Map.Entry::getValue, (oldValue, newValue) -> newValue));
return new TableSchema(columnsInferredFromSchema);
}

private TableSchema getTableSchemaFromJson(JsonNode recordNode, Set<String> columnNamesSet) {
Map<String, ColumnInfos> columnsInferredFromJson =
Streams.stream(recordNode.fields())
.map(ColumnValuePair::of)
.filter(pair -> columnNamesSet.contains(pair.getQuotedColumnName()))
.map(
pair ->
Maps.immutableEntry(
pair.getQuotedColumnName(),
new ColumnInfos(inferDataTypeFromJsonObject(pair.getJsonNode()))))
.collect(
Collectors.toMap(
Map.Entry::getKey, Map.Entry::getValue, (oldValue, newValue) -> newValue));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open to suggestions how to make the refactoring nicer, java 8 does not help, kotlin would be perfect here ;(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in theory we could split the schemaFromJson and schemaFromRecordSchema into two separate classes but i wouldn't invest here much more as this class is likely to be fully changed for the complex iceberg types

@sfc-gh-wtrefon sfc-gh-wtrefon force-pushed the wtrefon/SNOW-1694636-refactor-schema-resolver branch from 869aa4e to 0b3b5a4 Compare October 3, 2024 14:22
@sfc-gh-wtrefon sfc-gh-wtrefon force-pushed the wtrefon/SNOW-1694636-refactor-schema-resolver branch from 0b3b5a4 to 5cd5762 Compare October 3, 2024 14:31
@@ -12,6 +12,11 @@ public ColumnInfos(String columnType, String comments) {
this.comments = comments;
}

public ColumnInfos(String columnType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why plural form here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are the author of this class :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and this is constructor so it has to be class name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so that's my mistake, sorry about that 😄

if (columnNames == null) {
public TableSchema resolveTableSchemaFromRecord(
SinkRecord record, List<String> columnsToInclude) {
if (columnsToInclude == null || columnsToInclude.isEmpty()) {
return new TableSchema(new HashMap<>());
Copy link
Contributor

@sfc-gh-akowalczyk sfc-gh-akowalczyk Oct 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return new TableSchema(new HashMap<>());
return new TableSchema(ImmutableMap.of());

Why do we not use an immutable map here?

private final String quotedColumnName;
private final JsonNode jsonNode;

public static ColumnValuePair of(Map.Entry<String, JsonNode> field) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of suggests we construct the object with a given value. However, here, we apply some conversion - extracting fields, so from would be more suitable.

Copy link
Contributor

@sfc-gh-akowalczyk sfc-gh-akowalczyk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only a few comments related to the naming convention.
It looks much better when the given refactoring is applied.

Copy link
Contributor

@sfc-gh-achyzy sfc-gh-achyzy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good although think about replacing peek operation on stream

Streams.stream(recordNode.fields())
.map(ColumnValuePair::of)
.filter(pair -> columnNamesSet.contains(pair.getQuotedColumnName()))
.peek(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of peek as the docs explicitly stated: This method exists mainly to support debugging, where you want to see the elements as they flow past a certain point in a pipeline:
Maybe it would be better to partition/group it into separate streams?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionBy looks ok

@sfc-gh-wtrefon sfc-gh-wtrefon force-pushed the wtrefon/SNOW-1694636-refactor-schema-resolver branch from 5e72906 to 948b8b9 Compare October 4, 2024 11:05
@sfc-gh-wtrefon sfc-gh-wtrefon enabled auto-merge (squash) October 4, 2024 11:30
@sfc-gh-wtrefon sfc-gh-wtrefon merged commit 3bf31ff into master Oct 4, 2024
78 of 80 checks passed
@sfc-gh-wtrefon sfc-gh-wtrefon deleted the wtrefon/SNOW-1694636-refactor-schema-resolver branch October 4, 2024 12:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants