Skip to content

Commit

Permalink
manual formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Oct 18, 2023
1 parent 53db470 commit d61d160
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ public class TopicPartitionChannel {
* <li>If channel fails to fetch offsetToken from Snowflake, we reopen the channel and try to
* fetch offset from Snowflake again
* <li>If channel fails to ingest a buffer(Buffer containing rows/offsets), we reopen the
* channel and try to fetch offset from Snowflake again. Schematization purposefully fails the first buffer insert in order to alter the table, and then expects Kafka to resend data
* channel and try to fetch offset from Snowflake again. Schematization purposefully fails
* the first buffer insert in order to alter the table, and then expects Kafka to resend
* data
* </ol>
*
* <p>In both cases above, we ask Kafka to send back offsets, strictly from offset number after
Expand Down Expand Up @@ -398,12 +400,15 @@ private boolean shouldIgnoreAddingRecordToBuffer(
// channel
if (!isOffsetResetInKafka
&& currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
LOGGER.debug("No offset registered in Snowflake and offset is not being reset, we can add this offset to buffer for channel:{}", currentProcessedOffset);
LOGGER.debug(
"No offset registered in Snowflake and offset is not being reset, we can add this offset"
+ " to buffer for channel:{}",
currentProcessedOffset);
return false;
}

// Don't ignore if we see the expected offset; otherwise log and skip
if ((kafkaSinkRecord.kafkaOffset() - currentProcessedOffset ) == 1L) {
if ((kafkaSinkRecord.kafkaOffset() - currentProcessedOffset) == 1L) {
LOGGER.debug(
"Got the desired offset:{} from Kafka, we can add this offset to buffer for channel:{}",
kafkaSinkRecord.kafkaOffset(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import net.snowflake.ingest.internal.apache.commons.math3.analysis.function.Sin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ public void testInsertRows_SuccessAfterReopenChannel() throws Exception {
int expectedOpenChannelCount = 0;
int expectedGetOffsetCount = 0;

// setup mocks to fail first insert and return two null snowflake offsets (open channel and failed insert) before succeeding
// setup mocks to fail first insert and return two null snowflake offsets (open channel and
// failed insert) before succeeding
Mockito.when(
mockStreamingChannel.insertRows(
ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)))
Expand Down Expand Up @@ -453,13 +454,13 @@ public void testInsertRows_SuccessAfterReopenChannel() throws Exception {
.getLatestCommittedOffsetToken();

// Now, it should be successful
// Mockito.when(
// mockStreamingChannel.insertRows(
// ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)))
// .thenReturn(new InsertValidationResponse());
//
// Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken())
// .thenReturn(Long.toString(noOfRecords - 1));
// Mockito.when(
// mockStreamingChannel.insertRows(
// ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)))
// .thenReturn(new InsertValidationResponse());
//
// Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken())
// .thenReturn(Long.toString(noOfRecords - 1));

// Retry the insert again, now everything should be ingested and the offset token should be
// noOfRecords-1
Expand Down

0 comments on commit d61d160

Please sign in to comment.