Skip to content

Commit

Permalink
Merge pull request #1446 from cloudsufi/cherry-pick/rc-0.23.3
Browse files Browse the repository at this point in the history
[🍒][PLUGIN-430][PLUGIN-1803][PLUGIN-1805][PLUGIN-1742] GCS/BQ Patch
  • Loading branch information
psainics authored Sep 18, 2024
2 parents 546fc7b + 5d1701e commit b655c99
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 47 deletions.
2 changes: 0 additions & 2 deletions docs/BigQueryArgumentSetter-action.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@

Properties
----------
**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc.

**Project ID**: Google Cloud Project ID, which uniquely identifies a project.
It can be found on the Dashboard in the Google Cloud Platform Console. This is the project
that the BigQuery job will run in. If a temporary bucket needs to be created, the service account
Expand Down
11 changes: 11 additions & 0 deletions docs/BigQueryTable-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,17 @@ is ignored if the table already exists.
**Time Partitioning Type**: Specifies the time partitioning type. Can either be Daily or Hourly or Monthly or Yearly.
Default is Daily. Ignored when table already exists

> The table below shows the compatibility of different time schema types with various time partitioning types in BigQuery.
| Schema Type / Partion Type | Hourly | Daily | Monthly | Yearly |
|-------------------------| ------- | ------- | ------- | ------- |
| TIMESTAMP_MILLIS | ✓ | ✓ | ✓ | ✓ |
| TIMESTAMP_MICROS | ✓ | ✓ | ✓ | ✓ |
| DATETIME | ✓ | ✓ | ✓ | ✓ |
| DATE | ✗ | ✓ | ✓ | ✓ |
| TIME_MILLIS | ✗ | ✗ | ✗ | ✗ |
| TIME_MICROS | ✗ | ✗ | ✗ | ✗ |

**Range Start**: For integer partitioning, specifies the start of the range. Only used when table doesn’t
exist already, and partitioning type is set to Integer.
* The start value is inclusive.
Expand Down
2 changes: 0 additions & 2 deletions docs/GCSArgumentSetter-action.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ must be readable by all users running the job.

Properties
----------
**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc.

**Project ID**: Google Cloud Project ID, which uniquely identifies a project.
It can be found on the Dashboard in the Google Cloud Platform Console. This is the project
that the BigQuery job will run in. If a temporary bucket needs to be created, the service account
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
<bigquery.connector.hadoop2.version>hadoop2-1.2.0</bigquery.connector.hadoop2.version>
<commons.codec.version>1.4</commons.codec.version>
<cdap.version>6.9.1</cdap.version>
<cdap.plugin.version>2.11.1</cdap.plugin.version>
<cdap.plugin.version>2.12.1</cdap.plugin.version>
<dropwizard.metrics-core.version>3.2.6</dropwizard.metrics-core.version>
<flogger.system.backend.version>0.7.1</flogger.system.backend.version>
<gcs.connector.version>hadoop2-2.2.9</gcs.connector.version>
Expand Down Expand Up @@ -1245,7 +1245,7 @@
<dependency>
<groupId>io.cdap.tests.e2e</groupId>
<artifactId>cdap-e2e-framework</artifactId>
<version>0.3.1</version>
<version>0.3.2</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,23 +506,21 @@ private void validateTimePartitioningColumn(String columnName, FailureCollector

boolean isTimestamp = logicalType == LogicalType.TIMESTAMP_MICROS || logicalType == LogicalType.TIMESTAMP_MILLIS;
boolean isDate = logicalType == LogicalType.DATE;
boolean isTimestampOrDate = isTimestamp || isDate;

// If timePartitioningType is HOUR, then logicalType cannot be DATE Only TIMESTAMP_MICROS and TIMESTAMP_MILLIS
if (timePartitioningType == TimePartitioning.Type.HOUR && !isTimestamp) {
collector.addFailure(
String.format("Partition column '%s' is of invalid type '%s'.",
boolean isDateTime = logicalType == LogicalType.DATETIME;
boolean isTimestampOrDateOrDateTime = isTimestamp || isDate || isDateTime;
boolean isTimestampOrDateTime = isTimestamp || isDateTime;
// TimePartitioningType HOUR is supported by TIMESTAMP_MICROS, TIMESTAMP_MILLIS, DATETIME
if (timePartitioningType == TimePartitioning.Type.HOUR && !isTimestampOrDateTime) {
collector.addFailure(String.format("Partition column '%s' is of invalid type '%s'.",
columnName, fieldSchema.getDisplayName()),
"Partition column must be a timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD)
.withOutputSchemaField(columnName).withInputSchemaField(columnName);

// For any other timePartitioningType (DAY, MONTH, YEAR) logicalType can be DATE, TIMESTAMP_MICROS, TIMESTAMP_MILLIS
} else if (!isTimestampOrDate) {
collector.addFailure(
String.format("Partition column '%s' is of invalid type '%s'.",
"Partition column must be of type TIMESTAMP or DATETIME")
.withConfigProperty(NAME_PARTITION_BY_FIELD).withOutputSchemaField(columnName).withInputSchemaField(columnName);
// TimePartitioningType (DAY, MONTH, YEAR) are supported by TIMESTAMP_MICROS, TIMESTAMP_MILLIS, DATE, DATETIME
} else if (!isTimestampOrDateOrDateTime) {
collector.addFailure(String.format("Partition column '%s' is of invalid type '%s'.",
columnName, fieldSchema.getDisplayName()),
"Partition column must be a date or timestamp.").withConfigProperty(NAME_PARTITION_BY_FIELD)
.withOutputSchemaField(columnName).withInputSchemaField(columnName);
"Partition column must be of type TIMESTAMP, DATE or DATETIME")
.withConfigProperty(NAME_PARTITION_BY_FIELD).withOutputSchemaField(columnName).withInputSchemaField(columnName);
}
}

Expand Down
23 changes: 18 additions & 5 deletions src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -438,15 +438,28 @@ public static ValidationFailure validateFieldSchemaMatches(Field bqField, Schema
if (bqField.getMode() == Field.Mode.REPEATED) {
fieldSchema = fieldSchema.getComponentSchema();
type = fieldSchema.getType();
logicalType = fieldSchema.getLogicalType();
}
}

String[] incompatibleFieldErrorMessage = {
String.format("Field '%s' of type '%s' is incompatible with column '%s' of type '%s' in BigQuery table '%s.%s'.",
field.getName(), fieldSchema.getDisplayName(), bqField.getName(),
BQ_TYPE_MAP.get(bqField.getType()), dataset, table) ,
String.format("It must be of type '%s'.", BQ_TYPE_MAP.get(bqField.getType()))
};
if (logicalType != null) {
if (LOGICAL_TYPE_MAP.get(logicalType) != null && !LOGICAL_TYPE_MAP.get(logicalType).contains(bqField.getType())) {
return collector.addFailure(incompatibleFieldErrorMessage[0], incompatibleFieldErrorMessage[1]);
}

// Return once logical types are validated. This is because logical types are represented as primitive types
// internally.
return null;
}

if (TYPE_MAP.get(type) != null && !TYPE_MAP.get(type).contains(bqField.getType())) {
return collector.addFailure(
String.format("Field '%s' of type '%s' is incompatible with column '%s' of type '%s' " +
"in BigQuery table '%s.%s'.", field.getName(), fieldSchema.getDisplayName(), bqField.getName(),
BQ_TYPE_MAP.get(bqField.getType()), dataset, table),
String.format("It must be of type '%s'.", BQ_TYPE_MAP.get(bqField.getType())));
return collector.addFailure(incompatibleFieldErrorMessage[0], incompatibleFieldErrorMessage[1]);
}
return null;
}
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/io/cdap/plugin/gcp/common/GCSEmptyInputFormat.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*/

package io.cdap.plugin.gcp.common;

import io.cdap.plugin.format.input.AbstractEmptyInputFormat;


/**
* An InputFormat that returns no data.
* @param <K> the type of key
* @param <V> the type of value
*/
public class GCSEmptyInputFormat<K, V> extends AbstractEmptyInputFormat<K, V> {
// no-op
}
11 changes: 6 additions & 5 deletions src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.cdap.plugin.format.plugin.FileSourceProperties;
import io.cdap.plugin.gcp.common.GCPConnectorConfig;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.common.GCSEmptyInputFormat;
import io.cdap.plugin.gcp.crypto.EncryptedFileSystem;
import io.cdap.plugin.gcp.gcs.GCSPath;
import io.cdap.plugin.gcp.gcs.connector.GCSConnector;
Expand Down Expand Up @@ -77,6 +78,11 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);
}

@Override
protected String getEmptyInputFormatClassName() {
return GCSEmptyInputFormat.class.getName();
}

@Override
public void prepareRun(BatchSourceContext context) throws Exception {
// Get location of the source for lineage
Expand Down Expand Up @@ -268,11 +274,6 @@ public Long getMinSplitSize() {
return minSplitSize;
}

@Override
public boolean shouldAllowEmptyInput() {
return false;
}

public boolean isCopyHeader() {
return shouldCopyHeader();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,34 @@ public void testValidateTimePartitioningColumnWithNullAndDate() throws
Assert.assertEquals(0, collector.getValidationFailures().size());
}

@Test
public void testValidateTimePartitioningColumnWithMonthAndDateTime() throws
InvocationTargetException, IllegalAccessException {

String columnName = "partitionFrom";
Schema schema = Schema.of(Schema.LogicalType.DATETIME);

Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema;
TimePartitioning.Type timePartitioningType = TimePartitioning.Type.MONTH;

validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
Assert.assertEquals(0, collector.getValidationFailures().size());
}

@Test
public void testValidateTimePartitioningColumnWithHourAndDateTime() throws
InvocationTargetException, IllegalAccessException {

String columnName = "partitionFrom";
Schema schema = Schema.of(Schema.LogicalType.DATETIME);

Schema fieldSchema = schema.isNullable() ? schema.getNonNullable() : schema;
TimePartitioning.Type timePartitioningType = TimePartitioning.Type.HOUR;

validateTimePartitioningColumnMethod.invoke(config, columnName, collector, fieldSchema, timePartitioningType);
Assert.assertEquals(0, collector.getValidationFailures().size());
}

@Test
public void testValidateColumnNameWithValidColumnName() {
String columnName = "test";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.validation.ValidationFailure;
import io.cdap.cdap.etl.mock.validation.MockFailureCollector;
import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceConfig;
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.BigNumeric;
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.Numeric;
import io.cdap.plugin.gcp.common.GCPUtils;
Expand Down Expand Up @@ -478,4 +479,32 @@ public void testConvertFieldTypeJsonToString() {
Schema result = BigQueryUtil.convertFieldType(field, null, null);
Assert.assertEquals(expectedSchema, result);
}

@Test
public void testValidateFieldSchemaMatchesDate() {
MockFailureCollector collector = new MockFailureCollector();
Field bigQueryField = Field.newBuilder("testFieldRepeatedDate", StandardSQLTypeName.DATE)
.setMode(Field.Mode.REPEATED).build();
Schema.Field schemaField = Schema.Field.of("testFieldRepeatedDate",
Schema.nullableOf(Schema.arrayOf(Schema.of(Schema.LogicalType.DATE))));
ValidationFailure failure = BigQueryUtil.validateFieldSchemaMatches(bigQueryField, schemaField, "dataset",
"table", BigQuerySourceConfig.SUPPORTED_TYPES, collector);
Assert.assertNull(failure);
Assert.assertEquals(0, collector.getValidationFailures().size());
}

@Test
public void testValidateFieldSchemaNotMatchesDate() {
MockFailureCollector collector = new MockFailureCollector();
Field bigQueryField = Field.newBuilder("testFieldRepeatedDate", StandardSQLTypeName.DATE)
.setMode(Field.Mode.REPEATED).build();
Schema.Field schemaField = Schema.Field.of("testFieldRepeatedDate",
Schema.nullableOf(Schema.arrayOf(Schema.of(Schema.Type.STRING))));
ValidationFailure failure = BigQueryUtil.validateFieldSchemaMatches(bigQueryField, schemaField, "dataset",
"table", BigQuerySourceConfig.SUPPORTED_TYPES, collector);
Assert.assertNotNull(failure);
Assert.assertEquals(1, collector.getValidationFailures().size());
Assert.assertEquals("Field 'testFieldRepeatedDate' of type 'string' is incompatible with" +
" column 'testFieldRepeatedDate' of type 'date' in BigQuery table 'dataset.table'.", failure.getMessage());
}
}
8 changes: 0 additions & 8 deletions widgets/BigQueryArgumentSetter-action.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,6 @@
{
"label": "Basic",
"properties": [
{
"widget-type": "textbox",
"label": "Reference Name",
"name": "referenceName",
"widget-attributes" : {
"placeholder": "Name used to identify this source for lineage"
}
},
{
"widget-type": "connection-browser",
"widget-category": "plugin",
Expand Down
8 changes: 0 additions & 8 deletions widgets/GCSArgumentSetter-action.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,6 @@
{
"label": "Basic",
"properties": [
{
"widget-type": "textbox",
"label": "Reference Name",
"name": "referenceName",
"widget-attributes": {
"placeholder": "Name used to identify this source for lineage"
}
},
{
"widget-type": "textbox",
"label": "Project ID",
Expand Down

0 comments on commit b655c99

Please sign in to comment.