Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLUGIN-1563] Add support in BQ Source to read JSON columns #50

Merged
merged 1 commit into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ private static void addToRecordBuilder(StructuredRecord.Builder recordBuilder, S
*/
public static Object convertValue(Field field, FieldValue fieldValue) {
LegacySQLTypeName type = field.getType();
StandardSQLTypeName standardType = type.getStandardType();
// Treat JSON as string
StandardSQLTypeName standardType = LegacySQLTypeName.valueOf("JSON").equals(type) ?
StandardSQLTypeName.STRING : type.getStandardType();
switch (standardType) {
case TIME:
return LocalTime.parse(fieldValue.getStringValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,9 @@ public static Schema convertFieldType(Field field, @Nullable FailureCollector co
public static Schema convertFieldType(Field field, @Nullable FailureCollector collector,
@Nullable String recordPrefix) {
LegacySQLTypeName type = field.getType();
StandardSQLTypeName standardType = type.getStandardType();
// Treat JSON as string
StandardSQLTypeName standardType = LegacySQLTypeName.valueOf("JSON").equals(type) ?
StandardSQLTypeName.STRING : type.getStandardType();
switch (standardType) {
case FLOAT64:
// float is a float64, so corresponding type becomes double
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.TableResult;
import com.google.common.io.BaseEncoding;
Expand Down Expand Up @@ -159,4 +160,13 @@ private String paddHeaddingZero(String value, int length) {
}
return Strings.repeat('0', length - value.length()) + value;
}

@Test
public void testJsonFieldConversionToString() {
Field field = Field.newBuilder("demo", LegacySQLTypeName.valueOf("JSON")).build();
String jsonValue = "{\"key\":\"value\"}";
FieldValue fieldValue = FieldValue.of(FieldValue.Attribute.PRIMITIVE, jsonValue);
Object result = BigQueryDataParser.convertValue(field, fieldValue);
Assert.assertEquals(jsonValue, result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.cdap.plugin.gcp.bigquery.util;

import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.StandardSQLTypeName;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
Expand Down Expand Up @@ -470,4 +471,11 @@ public void testJobLabelWithValueStartingWithCaptialLetter() {
collector.getValidationFailures().get(0).getMessage());
}

@Test
public void testConvertFieldTypeJsonToString() {
Field field = Field.newBuilder("demo", LegacySQLTypeName.valueOf("JSON")).build();
Schema expectedSchema = Schema.of(Schema.Type.STRING);
Schema result = BigQueryUtil.convertFieldType(field, null, null);
Assert.assertEquals(expectedSchema, result);
}
}