From b9f89c5169f6acf5b569de4b9b7557ea93e40cbd Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Fri, 7 Jun 2024 10:56:56 +0800 Subject: [PATCH 1/3] fix --- .../org/apache/paimon/data/BinaryWriter.java | 54 +++++++++++++++++++ .../serializer/InternalRowSerializer.java | 2 +- .../paimon/flink/ReadWriteTableITCase.java | 25 +++++++++ 3 files changed, 80 insertions(+), 1 deletion(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java index 266e409e8b80..e811d22d4169 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java @@ -209,6 +209,60 @@ static ValueSetter createValueSetter(DataType elementType) { } } + static ValueSetter createValueSetter(DataType elementType,Serializer serializer) { + // ordered by type root definition + switch (elementType.getTypeRoot()) { + case CHAR: + case VARCHAR: + return (writer, pos, value) -> writer.writeString(pos, (BinaryString) value); + case BOOLEAN: + return (writer, pos, value) -> writer.writeBoolean(pos, (boolean) value); + case BINARY: + case VARBINARY: + return (writer, pos, value) -> writer.writeBinary(pos, (byte[]) value); + case DECIMAL: + final int decimalPrecision = getPrecision(elementType); + return (writer, pos, value) -> + writer.writeDecimal(pos, (Decimal) value, decimalPrecision); + case TINYINT: + return (writer, pos, value) -> writer.writeByte(pos, (byte) value); + case SMALLINT: + return (writer, pos, value) -> writer.writeShort(pos, (short) value); + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + return (writer, pos, value) -> writer.writeInt(pos, (int) value); + case BIGINT: + return (writer, pos, value) -> writer.writeLong(pos, (long) value); + case FLOAT: + return (writer, pos, value) -> writer.writeFloat(pos, (float) value); + case DOUBLE: + return (writer, pos, value) -> writer.writeDouble(pos, (double) value); + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int timestampPrecision = getPrecision(elementType); + return (writer, pos, value) -> + writer.writeTimestamp(pos, (Timestamp) value, timestampPrecision); + case ARRAY: + return (writer, pos, value) -> + writer.writeArray( + pos, + (InternalArray) value, + (InternalArraySerializer) serializer); + case MULTISET: + case MAP: + return (writer, pos, value) -> + writer.writeMap( + pos, (InternalMap) value, (InternalMapSerializer) serializer); + case ROW: + return (writer, pos, value) -> + writer.writeRow( + pos, (InternalRow) value, (InternalRowSerializer) serializer); + default: + throw new IllegalArgumentException(); + } + } + /** Accessor for setting the elements of a binary writer during runtime. */ interface ValueSetter extends Serializable { void setValue(BinaryWriter writer, int pos, Object value); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java index 8b4810f57b71..0271c2a51bc8 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java @@ -73,7 +73,7 @@ public InternalRowSerializer(DataType[] types, Serializer[] fieldSerializers) for (int i = 0; i < types.length; i++) { DataType type = types[i]; fieldGetters[i] = InternalRow.createFieldGetter(type, i); - valueSetters[i] = BinaryWriter.createValueSetter(type); + valueSetters[i] = BinaryWriter.createValueSetter(type,this); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java index 3fa9d4fb4033..24cef1dc569a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java @@ -18,6 +18,8 @@ package org.apache.paimon.flink; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.sink.FlinkTableSink; import org.apache.paimon.flink.util.AbstractTestBase; @@ -1037,6 +1039,29 @@ public void testSourceParallelism() throws Exception { .isEqualTo(66); } + @Test + void testConvertRowType2Serializer() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = + StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().inBatchMode().build()); + tEnv.executeSql( + "CREATE CATALOG my_catalog WITH (\n" + + " 'type' = 'paimon',\n" + + " 'warehouse' = '" + + getTempDirPath() + + "'\n" + + ")"); + tEnv.executeSql("USE CATALOG my_catalog"); + tEnv.executeSql( + "CREATE TABLE tmp (\n" + + "execution\n" + + "ROW<`execution_server` STRING, `execution_insertion` ARRAY, `timing` ROW<`client_log_timestamp` BIGINT, `event_api_timestamp` BIGINT, `log_timestamp` BIGINT, `processing_timestamp` BIGINT>, `client_info` ROW<`client_type` STRING, `traffic_type` STRING>, `insertion_id` STRING, `request_id` STRING, `view_id` STRING, `auto_view_id` STRING, `session_id` STRING, `content_id` STRING, `position` BIGINT, `properties` ROW<`struct` ROW<`fields` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `struct_json` STRING>, `feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY> NOT NULL>, `model_scores` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY NOT NULL>, `backoff_predictors` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `blender_stage` ROW<`score` FLOAT, `steps` ARRAY, `boost_step` ROW<`fid` BIGINT, `delta` FLOAT>> NOT NULL>, `sort_key` ARRAY, `experiments` ARRAY NOT NULL>>, `retrieval_rank` BIGINT, `retrieval_score` FLOAT> NOT NULL>, `latency` ARRAY NOT NULL>, `execution_stats` ROW<`stages` ARRAY NOT NULL>>> NOT NULL>>, `request_feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `user_feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `model_ref` ARRAY NOT NULL>, `server_version` STRING, `after_response_stage` ROW<`removed_execution_insertion_count` INT>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY> NOT NULL>, `model_scores` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY NOT NULL>, `backoff_predictors` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `blender_config` STRING, `hyperloop_log` ROW<`parameter_logs` ARRAY> NOT NULL>>, `blender_session_log` ROW<`config_statements` ARRAY, `ids` ARRAY, `variable_logs` ARRAY> NOT NULL>, `allocation_logs` ARRAY, `name` STRING, `positions_considered` ARRAY, `positions_filled` ARRAY> NOT NULL>>, `experiments` ARRAY NOT NULL>, `effective_user_info` ROW<`user_id` STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>>);"); + assertThatThrownBy(()-> + tEnv.executeSql( + "INSERT INTO tmp VALUES (CAST(NULL AS ROW<`execution_server` STRING, `execution_insertion` ARRAY, `timing` ROW<`client_log_timestamp` BIGINT, `event_api_timestamp` BIGINT, `log_timestamp` BIGINT, `processing_timestamp` BIGINT>, `client_info` ROW<`client_type` STRING, `traffic_type` STRING>, `insertion_id` STRING, `request_id` STRING, `view_id` STRING, `auto_view_id` STRING, `session_id` STRING, `content_id` STRING, `position` BIGINT, `properties` ROW<`struct` ROW<`fields` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `struct_json` STRING>, `feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY> NOT NULL>, `model_scores` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY NOT NULL>, `backoff_predictors` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `blender_stage` ROW<`score` FLOAT, `steps` ARRAY, `boost_step` ROW<`fid` BIGINT, `delta` FLOAT>> NOT NULL>, `sort_key` ARRAY, `experiments` ARRAY NOT NULL>>, `retrieval_rank` BIGINT, `retrieval_score` FLOAT> NOT NULL>, `latency` ARRAY NOT NULL>, `execution_stats` ROW<`stages` ARRAY NOT NULL>>> NOT NULL>>, `request_feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `user_feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `model_ref` ARRAY NOT NULL>, `server_version` STRING, `after_response_stage` ROW<`removed_execution_insertion_count` INT>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY> NOT NULL>, `model_scores` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY NOT NULL>, `backoff_predictors` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `blender_config` STRING, `hyperloop_log` ROW<`parameter_logs` ARRAY> NOT NULL>>, `blender_session_log` ROW<`config_statements` ARRAY, `ids` ARRAY, `variable_logs` ARRAY> NOT NULL>, `allocation_logs` ARRAY, `name` STRING, `positions_considered` ARRAY, `positions_filled` ARRAY> NOT NULL>>, `experiments` ARRAY NOT NULL>, `effective_user_info` ROW<`user_id` STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>>))") + .print() ).doesNotThrowAnyException(); + } + @Test public void testInferParallelism() throws Exception { String table = From ecf7809475a115fa09ed2d7808e8f473175afed8 Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Fri, 7 Jun 2024 10:57:46 +0800 Subject: [PATCH 2/3] fix --- .../org/apache/paimon/data/BinaryWriter.java | 6 ++---- .../serializer/InternalRowSerializer.java | 2 +- .../paimon/flink/ReadWriteTableITCase.java | 19 +++++++++++-------- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java index e811d22d4169..caee792a5526 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java @@ -209,7 +209,7 @@ static ValueSetter createValueSetter(DataType elementType) { } } - static ValueSetter createValueSetter(DataType elementType,Serializer serializer) { + static ValueSetter createValueSetter(DataType elementType, Serializer serializer) { // ordered by type root definition switch (elementType.getTypeRoot()) { case CHAR: @@ -246,9 +246,7 @@ static ValueSetter createValueSetter(DataType elementType,Serializer serializ case ARRAY: return (writer, pos, value) -> writer.writeArray( - pos, - (InternalArray) value, - (InternalArraySerializer) serializer); + pos, (InternalArray) value, (InternalArraySerializer) serializer); case MULTISET: case MAP: return (writer, pos, value) -> diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java index 0271c2a51bc8..6eefacb86881 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java @@ -73,7 +73,7 @@ public InternalRowSerializer(DataType[] types, Serializer[] fieldSerializers) for (int i = 0; i < types.length; i++) { DataType type = types[i]; fieldGetters[i] = InternalRow.createFieldGetter(type, i); - valueSetters[i] = BinaryWriter.createValueSetter(type,this); + valueSetters[i] = BinaryWriter.createValueSetter(type, this); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java index 24cef1dc569a..38aa60f53ef8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java @@ -18,8 +18,6 @@ package org.apache.paimon.flink; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.sink.FlinkTableSink; import org.apache.paimon.flink.util.AbstractTestBase; @@ -37,7 +35,9 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.sink.DataStreamSinkProvider; @@ -1043,12 +1043,13 @@ public void testSourceParallelism() throws Exception { void testConvertRowType2Serializer() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = - StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().inBatchMode().build()); + StreamTableEnvironment.create( + env, EnvironmentSettings.newInstance().inBatchMode().build()); tEnv.executeSql( "CREATE CATALOG my_catalog WITH (\n" + " 'type' = 'paimon',\n" + " 'warehouse' = '" - + getTempDirPath() + + getTempDirPath() + "'\n" + ")"); tEnv.executeSql("USE CATALOG my_catalog"); @@ -1056,10 +1057,12 @@ void testConvertRowType2Serializer() throws Exception { "CREATE TABLE tmp (\n" + "execution\n" + "ROW<`execution_server` STRING, `execution_insertion` ARRAY, `timing` ROW<`client_log_timestamp` BIGINT, `event_api_timestamp` BIGINT, `log_timestamp` BIGINT, `processing_timestamp` BIGINT>, `client_info` ROW<`client_type` STRING, `traffic_type` STRING>, `insertion_id` STRING, `request_id` STRING, `view_id` STRING, `auto_view_id` STRING, `session_id` STRING, `content_id` STRING, `position` BIGINT, `properties` ROW<`struct` ROW<`fields` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `struct_json` STRING>, `feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY> NOT NULL>, `model_scores` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY NOT NULL>, `backoff_predictors` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `blender_stage` ROW<`score` FLOAT, `steps` ARRAY, `boost_step` ROW<`fid` BIGINT, `delta` FLOAT>> NOT NULL>, `sort_key` ARRAY, `experiments` ARRAY NOT NULL>>, `retrieval_rank` BIGINT, `retrieval_score` FLOAT> NOT NULL>, `latency` ARRAY NOT NULL>, `execution_stats` ROW<`stages` ARRAY NOT NULL>>> NOT NULL>>, `request_feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `user_feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `model_ref` ARRAY NOT NULL>, `server_version` STRING, `after_response_stage` ROW<`removed_execution_insertion_count` INT>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY> NOT NULL>, `model_scores` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY NOT NULL>, `backoff_predictors` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `blender_config` STRING, `hyperloop_log` ROW<`parameter_logs` ARRAY> NOT NULL>>, `blender_session_log` ROW<`config_statements` ARRAY, `ids` ARRAY, `variable_logs` ARRAY> NOT NULL>, `allocation_logs` ARRAY, `name` STRING, `positions_considered` ARRAY, `positions_filled` ARRAY> NOT NULL>>, `experiments` ARRAY NOT NULL>, `effective_user_info` ROW<`user_id` STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>>);"); - assertThatThrownBy(()-> - tEnv.executeSql( - "INSERT INTO tmp VALUES (CAST(NULL AS ROW<`execution_server` STRING, `execution_insertion` ARRAY, `timing` ROW<`client_log_timestamp` BIGINT, `event_api_timestamp` BIGINT, `log_timestamp` BIGINT, `processing_timestamp` BIGINT>, `client_info` ROW<`client_type` STRING, `traffic_type` STRING>, `insertion_id` STRING, `request_id` STRING, `view_id` STRING, `auto_view_id` STRING, `session_id` STRING, `content_id` STRING, `position` BIGINT, `properties` ROW<`struct` ROW<`fields` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `struct_json` STRING>, `feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY> NOT NULL>, `model_scores` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY NOT NULL>, `backoff_predictors` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `blender_stage` ROW<`score` FLOAT, `steps` ARRAY, `boost_step` ROW<`fid` BIGINT, `delta` FLOAT>> NOT NULL>, `sort_key` ARRAY, `experiments` ARRAY NOT NULL>>, `retrieval_rank` BIGINT, `retrieval_score` FLOAT> NOT NULL>, `latency` ARRAY NOT NULL>, `execution_stats` ROW<`stages` ARRAY NOT NULL>>> NOT NULL>>, `request_feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `user_feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `model_ref` ARRAY NOT NULL>, `server_version` STRING, `after_response_stage` ROW<`removed_execution_insertion_count` INT>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY> NOT NULL>, `model_scores` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY NOT NULL>, `backoff_predictors` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `blender_config` STRING, `hyperloop_log` ROW<`parameter_logs` ARRAY> NOT NULL>>, `blender_session_log` ROW<`config_statements` ARRAY, `ids` ARRAY, `variable_logs` ARRAY> NOT NULL>, `allocation_logs` ARRAY, `name` STRING, `positions_considered` ARRAY, `positions_filled` ARRAY> NOT NULL>>, `experiments` ARRAY NOT NULL>, `effective_user_info` ROW<`user_id` STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>>))") - .print() ).doesNotThrowAnyException(); + assertThatThrownBy( + () -> + tEnv.executeSql( + "INSERT INTO tmp VALUES (CAST(NULL AS ROW<`execution_server` STRING, `execution_insertion` ARRAY, `timing` ROW<`client_log_timestamp` BIGINT, `event_api_timestamp` BIGINT, `log_timestamp` BIGINT, `processing_timestamp` BIGINT>, `client_info` ROW<`client_type` STRING, `traffic_type` STRING>, `insertion_id` STRING, `request_id` STRING, `view_id` STRING, `auto_view_id` STRING, `session_id` STRING, `content_id` STRING, `position` BIGINT, `properties` ROW<`struct` ROW<`fields` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `struct_json` STRING>, `feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY> NOT NULL>, `model_scores` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY NOT NULL>, `backoff_predictors` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `blender_stage` ROW<`score` FLOAT, `steps` ARRAY, `boost_step` ROW<`fid` BIGINT, `delta` FLOAT>> NOT NULL>, `sort_key` ARRAY, `experiments` ARRAY NOT NULL>>, `retrieval_rank` BIGINT, `retrieval_score` FLOAT> NOT NULL>, `latency` ARRAY NOT NULL>, `execution_stats` ROW<`stages` ARRAY NOT NULL>>> NOT NULL>>, `request_feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `user_feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `model_ref` ARRAY NOT NULL>, `server_version` STRING, `after_response_stage` ROW<`removed_execution_insertion_count` INT>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY> NOT NULL>, `model_scores` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY NOT NULL>, `backoff_predictors` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `blender_config` STRING, `hyperloop_log` ROW<`parameter_logs` ARRAY> NOT NULL>>, `blender_session_log` ROW<`config_statements` ARRAY, `ids` ARRAY, `variable_logs` ARRAY> NOT NULL>, `allocation_logs` ARRAY, `name` STRING, `positions_considered` ARRAY, `positions_filled` ARRAY> NOT NULL>>, `experiments` ARRAY NOT NULL>, `effective_user_info` ROW<`user_id` STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>>))") + .print()) + .doesNotThrowAnyException(); } @Test From c612384763d7ff24ddbaad573578eaeeaa53d6c9 Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Fri, 7 Jun 2024 14:04:54 +0800 Subject: [PATCH 3/3] fix --- .../org/apache/paimon/data/BinaryWriter.java | 76 +++++-------------- .../serializer/InternalRowSerializer.java | 2 +- .../paimon/flink/ReadWriteTableITCase.java | 8 +- 3 files changed, 23 insertions(+), 63 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java index caee792a5526..8c890be52381 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.serializer.InternalSerializers; import org.apache.paimon.data.serializer.Serializer; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.LocalZonedTimestampType; import org.apache.paimon.types.TimestampType; @@ -150,6 +151,11 @@ static void write( * @param elementType the element type */ static ValueSetter createValueSetter(DataType elementType) { + return createValueSetter(elementType, null); + } + + static ValueSetter createValueSetter(DataType elementType, Serializer serializer) { + Serializer finalSerializer = createSerializerIfNeed(elementType, serializer); // ordered by type root definition switch (elementType.getTypeRoot()) { case CHAR: @@ -184,81 +190,35 @@ static ValueSetter createValueSetter(DataType elementType) { return (writer, pos, value) -> writer.writeTimestamp(pos, (Timestamp) value, timestampPrecision); case ARRAY: - final Serializer arraySerializer = - InternalSerializers.create(elementType); return (writer, pos, value) -> writer.writeArray( pos, (InternalArray) value, - (InternalArraySerializer) arraySerializer); + (InternalArraySerializer) finalSerializer); case MULTISET: case MAP: - final Serializer mapSerializer = - InternalSerializers.create(elementType); return (writer, pos, value) -> writer.writeMap( - pos, (InternalMap) value, (InternalMapSerializer) mapSerializer); + pos, (InternalMap) value, (InternalMapSerializer) finalSerializer); case ROW: - final Serializer rowSerializer = - InternalSerializers.create(elementType); return (writer, pos, value) -> writer.writeRow( - pos, (InternalRow) value, (InternalRowSerializer) rowSerializer); + pos, (InternalRow) value, (InternalRowSerializer) finalSerializer); default: throw new IllegalArgumentException(); } } - static ValueSetter createValueSetter(DataType elementType, Serializer serializer) { - // ordered by type root definition - switch (elementType.getTypeRoot()) { - case CHAR: - case VARCHAR: - return (writer, pos, value) -> writer.writeString(pos, (BinaryString) value); - case BOOLEAN: - return (writer, pos, value) -> writer.writeBoolean(pos, (boolean) value); - case BINARY: - case VARBINARY: - return (writer, pos, value) -> writer.writeBinary(pos, (byte[]) value); - case DECIMAL: - final int decimalPrecision = getPrecision(elementType); - return (writer, pos, value) -> - writer.writeDecimal(pos, (Decimal) value, decimalPrecision); - case TINYINT: - return (writer, pos, value) -> writer.writeByte(pos, (byte) value); - case SMALLINT: - return (writer, pos, value) -> writer.writeShort(pos, (short) value); - case INTEGER: - case DATE: - case TIME_WITHOUT_TIME_ZONE: - return (writer, pos, value) -> writer.writeInt(pos, (int) value); - case BIGINT: - return (writer, pos, value) -> writer.writeLong(pos, (long) value); - case FLOAT: - return (writer, pos, value) -> writer.writeFloat(pos, (float) value); - case DOUBLE: - return (writer, pos, value) -> writer.writeDouble(pos, (double) value); - case TIMESTAMP_WITHOUT_TIME_ZONE: - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - final int timestampPrecision = getPrecision(elementType); - return (writer, pos, value) -> - writer.writeTimestamp(pos, (Timestamp) value, timestampPrecision); - case ARRAY: - return (writer, pos, value) -> - writer.writeArray( - pos, (InternalArray) value, (InternalArraySerializer) serializer); - case MULTISET: - case MAP: - return (writer, pos, value) -> - writer.writeMap( - pos, (InternalMap) value, (InternalMapSerializer) serializer); - case ROW: - return (writer, pos, value) -> - writer.writeRow( - pos, (InternalRow) value, (InternalRowSerializer) serializer); - default: - throw new IllegalArgumentException(); + static Serializer createSerializerIfNeed(DataType elementType, Serializer serializer) { + Serializer finalSerializer = serializer; + DataTypeRoot typeRoot = elementType.getTypeRoot(); + if (finalSerializer == null + && (typeRoot == DataTypeRoot.MAP + || typeRoot == DataTypeRoot.ROW + || typeRoot == DataTypeRoot.ARRAY)) { + finalSerializer = InternalSerializers.create(elementType); } + return finalSerializer; } /** Accessor for setting the elements of a binary writer during runtime. */ diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java index 6eefacb86881..97a5ae31d372 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java @@ -73,7 +73,7 @@ public InternalRowSerializer(DataType[] types, Serializer[] fieldSerializers) for (int i = 0; i < types.length; i++) { DataType type = types[i]; fieldGetters[i] = InternalRow.createFieldGetter(type, i); - valueSetters[i] = BinaryWriter.createValueSetter(type, this); + valueSetters[i] = BinaryWriter.createValueSetter(type, fieldSerializers[i]); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java index 38aa60f53ef8..d1e9b23e1b54 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java @@ -103,6 +103,7 @@ import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.warehouse; import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Paimon reading and writing IT cases. */ @@ -1040,7 +1041,7 @@ public void testSourceParallelism() throws Exception { } @Test - void testConvertRowType2Serializer() throws Exception { + void testConvertRowType2Serializer() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create( @@ -1057,11 +1058,10 @@ void testConvertRowType2Serializer() throws Exception { "CREATE TABLE tmp (\n" + "execution\n" + "ROW<`execution_server` STRING, `execution_insertion` ARRAY, `timing` ROW<`client_log_timestamp` BIGINT, `event_api_timestamp` BIGINT, `log_timestamp` BIGINT, `processing_timestamp` BIGINT>, `client_info` ROW<`client_type` STRING, `traffic_type` STRING>, `insertion_id` STRING, `request_id` STRING, `view_id` STRING, `auto_view_id` STRING, `session_id` STRING, `content_id` STRING, `position` BIGINT, `properties` ROW<`struct` ROW<`fields` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `struct_json` STRING>, `feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY> NOT NULL>, `model_scores` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY NOT NULL>, `backoff_predictors` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `blender_stage` ROW<`score` FLOAT, `steps` ARRAY, `boost_step` ROW<`fid` BIGINT, `delta` FLOAT>> NOT NULL>, `sort_key` ARRAY, `experiments` ARRAY NOT NULL>>, `retrieval_rank` BIGINT, `retrieval_score` FLOAT> NOT NULL>, `latency` ARRAY NOT NULL>, `execution_stats` ROW<`stages` ARRAY NOT NULL>>> NOT NULL>>, `request_feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `user_feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `model_ref` ARRAY NOT NULL>, `server_version` STRING, `after_response_stage` ROW<`removed_execution_insertion_count` INT>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY> NOT NULL>, `model_scores` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY NOT NULL>, `backoff_predictors` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `blender_config` STRING, `hyperloop_log` ROW<`parameter_logs` ARRAY> NOT NULL>>, `blender_session_log` ROW<`config_statements` ARRAY, `ids` ARRAY, `variable_logs` ARRAY> NOT NULL>, `allocation_logs` ARRAY, `name` STRING, `positions_considered` ARRAY, `positions_filled` ARRAY> NOT NULL>>, `experiments` ARRAY NOT NULL>, `effective_user_info` ROW<`user_id` STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>>);"); - assertThatThrownBy( + assertThatCode( () -> tEnv.executeSql( - "INSERT INTO tmp VALUES (CAST(NULL AS ROW<`execution_server` STRING, `execution_insertion` ARRAY, `timing` ROW<`client_log_timestamp` BIGINT, `event_api_timestamp` BIGINT, `log_timestamp` BIGINT, `processing_timestamp` BIGINT>, `client_info` ROW<`client_type` STRING, `traffic_type` STRING>, `insertion_id` STRING, `request_id` STRING, `view_id` STRING, `auto_view_id` STRING, `session_id` STRING, `content_id` STRING, `position` BIGINT, `properties` ROW<`struct` ROW<`fields` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `struct_json` STRING>, `feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY> NOT NULL>, `model_scores` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY NOT NULL>, `backoff_predictors` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `blender_stage` ROW<`score` FLOAT, `steps` ARRAY, `boost_step` ROW<`fid` BIGINT, `delta` FLOAT>> NOT NULL>, `sort_key` ARRAY, `experiments` ARRAY NOT NULL>>, `retrieval_rank` BIGINT, `retrieval_score` FLOAT> NOT NULL>, `latency` ARRAY NOT NULL>, `execution_stats` ROW<`stages` ARRAY NOT NULL>>> NOT NULL>>, `request_feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `user_feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `model_ref` ARRAY NOT NULL>, `server_version` STRING, `after_response_stage` ROW<`removed_execution_insertion_count` INT>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY> NOT NULL>, `model_scores` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY NOT NULL>, `backoff_predictors` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `blender_config` STRING, `hyperloop_log` ROW<`parameter_logs` ARRAY> NOT NULL>>, `blender_session_log` ROW<`config_statements` ARRAY, `ids` ARRAY, `variable_logs` ARRAY> NOT NULL>, `allocation_logs` ARRAY, `name` STRING, `positions_considered` ARRAY, `positions_filled` ARRAY> NOT NULL>>, `experiments` ARRAY NOT NULL>, `effective_user_info` ROW<`user_id` STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>>))") - .print()) + "INSERT INTO tmp VALUES (CAST(NULL AS ROW<`execution_server` STRING, `execution_insertion` ARRAY, `timing` ROW<`client_log_timestamp` BIGINT, `event_api_timestamp` BIGINT, `log_timestamp` BIGINT, `processing_timestamp` BIGINT>, `client_info` ROW<`client_type` STRING, `traffic_type` STRING>, `insertion_id` STRING, `request_id` STRING, `view_id` STRING, `auto_view_id` STRING, `session_id` STRING, `content_id` STRING, `position` BIGINT, `properties` ROW<`struct` ROW<`fields` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>>> NOT NULL>>, `list_value` ROW<`values` ARRAY> NOT NULL>>, `list_value` ROW<`values` ARRAY NOT NULL>>> NOT NULL>>> NOT NULL>>> NOT NULL>>>> NOT NULL>>, `struct_json` STRING>, `feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY> NOT NULL>, `model_scores` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY NOT NULL>, `backoff_predictors` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `blender_stage` ROW<`score` FLOAT, `steps` ARRAY, `boost_step` ROW<`fid` BIGINT, `delta` FLOAT>> NOT NULL>, `sort_key` ARRAY, `experiments` ARRAY NOT NULL>>, `retrieval_rank` BIGINT, `retrieval_score` FLOAT> NOT NULL>, `latency` ARRAY NOT NULL>, `execution_stats` ROW<`stages` ARRAY NOT NULL>>> NOT NULL>>, `request_feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `user_feature_stage` ROW<`features` ROW<`numeric` ARRAY NOT NULL>, `categorical` ARRAY NOT NULL>, `sparse` ARRAY NOT NULL>, `sparse_id` ARRAY NOT NULL>, `embeddings` ARRAY>> NOT NULL>, `feature_references` ARRAY NOT NULL>, `sparse_id_list` ARRAY>> NOT NULL>, `user_events` ROW<`user_events` ARRAY NOT NULL>, `user_events_all` ARRAY NOT NULL>>, `string_features` ARRAY NOT NULL>>>, `model_ref` ARRAY NOT NULL>, `server_version` STRING, `after_response_stage` ROW<`removed_execution_insertion_count` INT>, `personalize_stage` ROW<`personalize_ranking_score` FLOAT, `score_source` STRING, `personalize_ranking_scores` ARRAY> NOT NULL>, `model_scores` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `predictor_stage` ROW<`model_scores` ARRAY NOT NULL>, `backoff_predictors` ARRAY NOT NULL>, `models` ARRAY NOT NULL>>, `blender_config` STRING, `hyperloop_log` ROW<`parameter_logs` ARRAY> NOT NULL>>, `blender_session_log` ROW<`config_statements` ARRAY, `ids` ARRAY, `variable_logs` ARRAY> NOT NULL>, `allocation_logs` ARRAY, `name` STRING, `positions_considered` ARRAY, `positions_filled` ARRAY> NOT NULL>>, `experiments` ARRAY NOT NULL>, `effective_user_info` ROW<`user_id` STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>>))")) .doesNotThrowAnyException(); }