Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1737840: Adapt record mapping in RecordService #969

Merged

Conversation

sfc-gh-wtrefon
Copy link
Contributor

Overview

SNOW-1737840

Pre-review checklist

  • This change should be part of a Behavior Change Release. See go/behavior-change.
  • This change has passed Merge gate tests
  • Snowpipe Changes
  • Snowpipe Streaming Changes
  • This change is TEST-ONLY
  • This change is README/Javadocs only
  • This change is protected by a config parameter <PARAMETER_NAME> eg snowflake.ingestion.method.
    • Yes - Added end to end and Unit Tests.
    • No - Suggest why it is not param protected
  • Is his change protected by parameter <PARAMETER_NAME> on the server side?
    • The parameter/feature is not yet active in production (partial rollout or PrPr, see Changes for Unreleased Features and Fixes).
    • If there is an issue, it can be safely mitigated by turning the parameter off. This is also verified by a test (See go/ppp).

Comment on lines +37 to +39
if (includeAllMetadata) {
streamingIngestRow.put(TABLE_COLUMN_METADATA, mapper.writeValueAsString(row.getMetadata()));
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if there was some trick here but i moved it from the forEach loop above to here. I don't see any sense in parsing and putting it all over again for every jsonNode. This should improve overall processing speed as we do not serialise it for every json node we have

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I even saw some tech debt ticket regarding multiple serialization. It's great that you've optimized it.

import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

class IcebergTableStreamingRecordMapperTest {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please think of more test cases if possible with example :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks exhaustive to me, let's wait for the bugs to happen :)


@Override
public Map<String, Object> processSnowflakeRecord(
SnowflakeTableRow row, boolean schematizationEnabled, boolean includeAllMetadata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe just boolean includeMetadata?

String key = fields.next();
JsonNode valueNode = headersNode.get(key);
String value;
if (valueNode.isTextual()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extract String getTextualValue(JsonNode node) and reuse in SnowflakeTableStreamingRecordMapper?

this.clock = clock;
this.enableSchematization = enableSchematization;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if RecordService needs to be aware of enableSchematization value after creating StreamingRecordMapper abstraction. In fact we could pass this flag to a mapper in a factory instead of processSnowflakeRecord argument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

@@ -67,8 +123,7 @@ public static List<RecordWithMetadata<PrimitiveJsonRecord>> fromSchematizedResul
resultSet.getLong("ID_INT8"),
resultSet.getLong("ID_INT16"),
resultSet.getLong("ID_INT32"),
// FIXME: there is currently some bug in Iceberg when storing int64 values
// resultSet.getLong("ID_INT64"),
resultSet.getLong("ID_INT64"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@sfc-gh-mbobowski sfc-gh-mbobowski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some minor comments but great change overall!

@sfc-gh-wtrefon sfc-gh-wtrefon marked this pull request as ready for review October 24, 2024 13:43
@sfc-gh-wtrefon sfc-gh-wtrefon requested a review from a team as a code owner October 24, 2024 13:43
@sfc-gh-wtrefon sfc-gh-wtrefon merged commit 8a7a305 into master Oct 25, 2024
53 of 54 checks passed
@sfc-gh-wtrefon sfc-gh-wtrefon deleted the wtrefon/SNOW-1737840-record-service-iceberg-mapping branch October 25, 2024 06:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants