Skip to content

Commit

Permalink
Allow flexible column names
Browse files Browse the repository at this point in the history
  • Loading branch information
psainics committed Nov 21, 2023
1 parent e541bc6 commit b24053f
Show file tree
Hide file tree
Showing 21 changed files with 1,443 additions and 289 deletions.
6 changes: 6 additions & 0 deletions docs/BigQueryMultiTable-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,9 @@ GET https://www.googleapis.com/bigquery/v2/projects/xxxx/datasets/mysql_bq_perm?
have the permission to read the dataset you specified in this plugin. You must grant "BigQuery Data Editor" role on the
project identified by the `Dataset Project ID` you specified in this plugin to the service account. If you think you
already granted the role, check if you granted the role on the wrong project (for example the one identified by the `Project ID`).

Column Names
------------
A column name can contain the letters (a-z, A-Z), numbers (0-9), or underscores (_), and it must start with a letter or
underscore. For more flexible column name support, see
[flexible column names](https://cloud.google.com/bigquery/docs/schemas#flexible-column-names).
6 changes: 6 additions & 0 deletions docs/BigQueryTable-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,9 @@ GET https://www.googleapis.com/bigquery/v2/projects/xxxx/datasets/mysql_bq_perm?
have the permission to read the dataset you specified in this plugin. You must grant "BigQuery Data Editor" role on the
project identified by the `Dataset Project ID` you specified in this plugin to the service account. If you think you
already granted the role, check if you granted the role on the wrong project (for example the one identified by the `Project ID`).

Column Names
------------
A column name can contain the letters (a-z, A-Z), numbers (0-9), or underscores (_), and it must start with a letter or
underscore. For more flexible column name support, see
[flexible column names](https://cloud.google.com/bigquery/docs/schemas#flexible-column-names).
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.Table;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
Expand All @@ -36,12 +35,12 @@
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
Expand All @@ -55,7 +54,6 @@
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import javax.annotation.Nullable;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import com.google.cloud.hadoop.io.bigquery.BigQueryFactory;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.hadoop.io.bigquery.BigQueryHelper;
import com.google.cloud.hadoop.io.bigquery.BigQueryStrings;
import com.google.cloud.hadoop.io.bigquery.BigQueryUtils;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.ForwardingBigQueryFileOutputCommitter;
Expand All @@ -62,6 +61,7 @@
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryStrings;
import io.cdap.plugin.gcp.bigquery.source.BigQueryFactoryWithScopes;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import io.cdap.cdap.api.annotation.Description;
Expand All @@ -44,6 +43,7 @@
import io.cdap.cdap.etl.api.engine.sql.SQLEngineOutput;
import io.cdap.cdap.etl.common.Constants;
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnector;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngine;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryWrite;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig {
private static final String WHERE = "WHERE";
public static final Set<Schema.Type> SUPPORTED_CLUSTERING_TYPES =
ImmutableSet.of(Schema.Type.INT, Schema.Type.LONG, Schema.Type.STRING, Schema.Type.BOOLEAN, Schema.Type.BYTES);
private static final Pattern FIELD_PATTERN = Pattern.compile("[a-zA-Z0-9_]+");
// Read More : https://cloud.google.com/bigquery/docs/schemas#flexible-column-names
private static final Pattern FIELD_PATTERN = Pattern.compile("[\\p{L}\\p{M}\\p{N}\\p{Pc}\\p{Pd}&%+=:'<>#| ]+");

public static final String NAME_TABLE = "table";
public static final String NAME_SCHEMA = "schema";
Expand All @@ -75,6 +76,7 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig {
public static final String NAME_RANGE_INTERVAL = "rangeInterval";

public static final int MAX_NUMBER_OF_COLUMNS = 4;
private static final int MAX_LENGTH_OF_COLUMN_NAME = 300;

@Name(NAME_TABLE)
@Macro
Expand Down Expand Up @@ -345,9 +347,18 @@ public void validate(@Nullable Schema inputSchema, @Nullable Schema outputSchema
String name = field.getName();
// BigQuery column names only allow alphanumeric characters and _
// https://cloud.google.com/bigquery/docs/schemas#column_names
// Allow support for Flexible column names
// https://cloud.google.com/bigquery/docs/schemas#flexible-column-names
if (!FIELD_PATTERN.matcher(name).matches()) {
collector.addFailure(String.format("Output field '%s' must only contain alphanumeric characters and '_'.",
name), null).withOutputSchemaField(name);
collector.addFailure(String.format("Output field '%s' contains invalid characters. " +
"Check column names docs for more details.",
name), null).withOutputSchemaField(name);
}

// Check if the field name exceeds the maximum length of 300 characters.
if (name.length() > MAX_LENGTH_OF_COLUMN_NAME) {
collector.addFailure(String.format("Output field '%s' exceeds the maximum length of 300 characters.",
name), null).withOutputSchemaField(name);
}

// check if the required fields are present in the input schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableSchema;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
Expand All @@ -43,6 +40,9 @@
import io.cdap.cdap.etl.api.validation.ValidationFailure;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryOutputConfiguration;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableSchema;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.Numeric;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
Expand All @@ -62,6 +62,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -611,6 +612,13 @@ private static BigQueryFileFormat getFileFormat(List<BigQueryTableFieldSchema> f
if (DATETIME.equals(field.getType())) {
return BigQueryFileFormat.NEWLINE_DELIMITED_JSON;
}
// If the field name is not in english characters, then we will use json format
// We do this as the avro load job in BQ does not support non-english characters in field names for now
String fieldName = field.getName();
final String englishCharactersRegex = "[\\w]+";
if (!Pattern.matches(englishCharactersRegex, fieldName)) {
return BigQueryFileFormat.NEWLINE_DELIMITED_JSON;
}
// If the field is a record we have to check its subfields.
if (RECORD.equals(field.getType())) {
if (getFileFormat(field.getFields()) == BigQueryFileFormat.NEWLINE_DELIMITED_JSON) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package io.cdap.plugin.gcp.bigquery.sink;

import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package io.cdap.plugin.gcp.bigquery.sink;

import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
Expand Down
Loading

0 comments on commit b24053f

Please sign in to comment.