Skip to content

Commit

Permalink
SNOW-1831140 E2e tests of Iceberg JSON ingestion (#1013)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski authored Dec 3, 2024
1 parent 85db567 commit c16d13d
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ public void insertRecord(SinkRecord kafkaSinkRecord, boolean isFirstRowPerPartit
// Simply skip inserting into the buffer if the row should be ignored after channel reset
if (needToSkipCurrentBatch) {
LOGGER.info(
"Ignore adding offset:{} to buffer for channel:{} because we recently reset offset in"
"Ignore inserting offset:{} for channel:{} because we recently reset offset in"
+ " Kafka. currentProcessedOffset:{}",
kafkaSinkRecord.kafkaOffset(),
this.getChannelNameFormatV1(),
Expand Down
3 changes: 2 additions & 1 deletion test/rest_request_template/iceberg_avro_aws.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true,
"snowflake.streaming.max.client.lag": "1"
"snowflake.streaming.max.client.lag": "1",
"snowflake.streaming.enable.single.buffer": true
}
}
3 changes: 2 additions & 1 deletion test/rest_request_template/iceberg_json_aws.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true,
"snowflake.streaming.max.client.lag": "1"
"snowflake.streaming.max.client.lag": "1",
"snowflake.streaming.enable.single.buffer": true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true,
"snowflake.enable.schematization": true,
"snowflake.streaming.max.client.lag": "1"
"snowflake.streaming.max.client.lag": "1",
"snowflake.streaming.enable.single.buffer": true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true,
"snowflake.enable.schematization": true,
"snowflake.streaming.max.client.lag": "1"
"snowflake.streaming.max.client.lag": "1",
"snowflake.streaming.enable.single.buffer": true
}
}
57 changes: 54 additions & 3 deletions test/test_suit/base_iceberg_test.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from test_suit.base_e2e import BaseE2eTest
from test_suit.assertions import *
from test_suit.test_utils import RetryableError, NonRetryableError
import json

class BaseIcebergTest(BaseE2eTest):

def __init__(self, driver, name_salt: str, config_file_name: str):
self.driver = driver
self.fileName = "iceberg_json_aws"
self.file_name = config_file_name
self.topic = config_file_name + name_salt

self.test_message_from_docs = {
Expand Down Expand Up @@ -88,7 +90,31 @@ def clean(self):
self.driver.drop_iceberg_table(self.topic)


def verify_iceberg_content(self, content: dict):
def _send_json_values(self, msg: dict, number_of_messages: int):
json_msg = json.dumps(msg)

key = [json.dumps({"number": str(e)}).encode("utf-8") for e in range(number_of_messages)]
value = [json_msg.encode("utf-8") for _ in range(number_of_messages)]

self.driver.sendBytesData(
topic=self.topic,
value=value,
key=key,
partition=0,
headers=self.test_headers,
)

def _assert_number_of_records_in_table(self, expected_number_of_records: int):
number_of_records = self.driver.select_number_of_records(self.topic)
if number_of_records == 0:
raise RetryableError()
elif number_of_records != expected_number_of_records:
raise NonRetryableError(
"Number of record in table is different from number of record sent"
)


def _verify_iceberg_content_from_docs(self, content: dict):
assert_equals(1, content['id'])
assert_equals_with_precision(36.6, content['body_temperature'])
assert_equals('Steve', content['name'])
Expand All @@ -102,11 +128,36 @@ def verify_iceberg_content(self, content: dict):
assert_equals(False, content['animals_possessed']['cats'])


def verify_iceberg_metadata(self, metadata: dict):
def _verify_iceberg_metadata(self, metadata: dict):
assert_equals(0, metadata['offset'])
assert_equals(0, metadata['partition'])
assert_starts_with('iceberg_', metadata['topic'])
assert_not_null(metadata['SnowflakeConnectorPushTime'])

assert_dict_contains('header1', 'value1', metadata['headers'])


def _select_schematized_record_with_offset(self, offset: int) -> dict:
record = (
self.driver.snowflake_conn.cursor()
.execute("select id, body_temperature, name, approved_coffee_types, animals_possessed, null_long, null_array, null_object, empty_array, some_object from {} limit 1 offset {}".format(self.topic, offset))
.fetchone()
)

return {
"id": record[0],
"body_temperature": record[1],
"name": record[2],
"approved_coffee_types": self.__none_or_json_loads(record[3]),
"animals_possessed": self.__none_or_json_loads(record[4]),
"null_long": record[5],
"null_array": self.__none_or_json_loads(record[6]),
"null_object": self.__none_or_json_loads(record[7]),
"empty_array": self.__none_or_json_loads(record[8]),
"some_object": self.__none_or_json_loads(record[9])
}


def __none_or_json_loads(self, value: str) -> dict:
return None if value is None else json.loads(value)

4 changes: 2 additions & 2 deletions test/test_suit/iceberg_avro_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ def verify(self, round):
.fetchone()
)

self.verify_iceberg_content(json.loads(first_record[0]))
self.verify_iceberg_metadata(json.loads(first_record[1]))
self._verify_iceberg_content_from_docs(json.loads(first_record[0]))
self._verify_iceberg_metadata(json.loads(first_record[1]))
28 changes: 4 additions & 24 deletions test/test_suit/iceberg_json_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,17 @@ def setup(self):


def send(self):
msg = json.dumps(self.test_message)

key = []
value = []
for e in range(100):
key.append(json.dumps({"number": str(e)}).encode("utf-8"))
value.append(msg.encode("utf-8"))

self.driver.sendBytesData(
topic=self.topic,
value=value,
key=key,
partition=0,
headers=self.test_headers,
)
self._send_json_values(self.test_message_from_docs, 100)


def verify(self, round):
number_of_records = self.driver.select_number_of_records(self.topic)
if number_of_records == 0:
raise RetryableError()
elif number_of_records != 100:
raise NonRetryableError(
"Number of record in table is different from number of record sent"
)
self._assert_number_of_records_in_table(100)

first_record = (
self.driver.snowflake_conn.cursor()
.execute("Select * from {} limit 1".format(self.topic))
.fetchone()
)

self.verify_iceberg_content(json.loads(first_record[0]))
self.verify_iceberg_metadata(json.loads(first_record[1]))
self._verify_iceberg_content_from_docs(json.loads(first_record[0]))
self._verify_iceberg_metadata(json.loads(first_record[1]))
1 change: 1 addition & 0 deletions test/test_suit/iceberg_schema_evolution_avro_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def setup(self):
table_name=self.topic,
external_volume="kafka_push_e2e_volume_aws", # volume created manually
)
self.driver.enable_schema_evolution_for_iceberg(self.topic)


def send(self):
Expand Down
16 changes: 14 additions & 2 deletions test/test_suit/iceberg_schema_evolution_json_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,24 @@ def setup(self):
table_name=self.topic,
external_volume="kafka_push_e2e_volume_aws", # volume created manually
)
self.driver.enable_schema_evolution_for_iceberg(self.topic)


def send(self):
pass
self._send_json_values(self.test_message_from_docs, 100)
self._send_json_values(self.test_message_for_schema_evolution_1, 100)
# TODO SNOW-1731264
# net.snowflake.ingest.utils.SFException: The given row cannot be converted to the internal format: Object of type java.util.LinkedHashMap cannot be ingested into Snowflake column NULL_OBJECT of type STRING, rowIndex:0. Allowed Java types: String, Number, boolean, char
# self._send_json_values(self.test_message_for_schema_evolution_2, 100)


def verify(self, round):
pass
self._assert_number_of_records_in_table(200)

actual_record_from_docs_dict = self._select_schematized_record_with_offset(1)
actual_record_for_schema_evolution_1 = self._select_schematized_record_with_offset(100)
# TODO SNOW-1731264
# actual_record_for_schema_evolution_2 = self._select_schematized_record_with_offset(200)

print(actual_record_from_docs_dict)
self._verify_iceberg_content_from_docs(actual_record_from_docs_dict)
5 changes: 4 additions & 1 deletion test/test_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,13 @@ def cleanTableStagePipe(self, connectorName, topicName="", partitionNumber=1):

print(datetime.now().strftime("%H:%M:%S "), "=== Done ===", flush=True)

def enable_schema_evolution_for_iceberg(self, table: str):
self.snowflake_conn.cursor().execute("alter iceberg table {} set ENABLE_SCHEMA_EVOLUTION = true".format(table))

def create_empty_iceberg_table(self, table_name: str, external_volume: str):
sql = """
CREATE ICEBERG TABLE IF NOT EXISTS {} (
record_content OBJECT()
record_metadata OBJECT()
)
EXTERNAL_VOLUME = '{}'
CATALOG = 'SNOWFLAKE'
Expand Down

0 comments on commit c16d13d

Please sign in to comment.