-
Notifications
You must be signed in to change notification settings - Fork 99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NO-SNOW Improve offset logging for Snowpipe #1027
Conversation
src/main/java/com/snowflake/kafka/connector/internal/OffsetContinuityRanges.java
Outdated
Show resolved
Hide resolved
src/test/java/com/snowflake/kafka/connector/internal/FileNameUtilsTest.java
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/FileNameUtils.java
Outdated
Show resolved
Hide resolved
import org.apache.commons.lang3.tuple.Pair; | ||
|
||
public class OffsetContinuityRanges { | ||
private final List<Pair<Long, Long>> continuousOffsets; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we could also introduce a class "OffsetRange" with startOffsetInclusive, endOffsetExclusive but I'm fine with a pair
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or a data class... oh no wait, it's not a Kotlin.
I think that Pair is good enough for this use case.
Arguments.of(Collections.singletonList(fileName(filePrefix, 0, 10)), "[[0,10]]", "[]"), | ||
Arguments.of( | ||
Arrays.asList(fileName(filePrefix, 0, 10), fileName(filePrefix, 100, 2137)), | ||
"[[0,10][100,2137]]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✝️
LOGGER.debug( | ||
"Moving failed files for pipe:{} to tableStage failedFileCount:{}, failedFiles:{}", | ||
OffsetContinuityRanges offsets = searchForMissingOffsets(failedFiles); | ||
LOGGER.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we also print failedFiles on debug lvl? The original PR had that and I think this can be useful to know the prefixes when debbuging some cases
src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java
Outdated
Show resolved
Hide resolved
…SinkServiceV1.java Co-authored-by: Dariusz Seweryn <[email protected]>
Co-authored-by: Wojciech Trefon <[email protected]>
Co-authored-by: Michał Bobowski <[email protected]> Co-authored-by: Wojciech Trefon <[email protected]>
Overview
Refactored from:
#1023
Pre-review checklist
snowflake.ingestion.method
.Yes
- Added end to end and Unit Tests.No
- Suggest why it is not param protected