Skip to content

Commit

Permalink
Merge pull request #1270 from data-integrations/PLUGIN-1647
Browse files Browse the repository at this point in the history
[PLUGIN-1647] Upgrade gcs hadoop2 connector to 2.2.9
  • Loading branch information
itsankit-google authored Jul 28, 2023
2 parents 2604741 + 22bbd6e commit 2c0b6a1
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 40 deletions.
52 changes: 49 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@
<jee.version>7</jee.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<avro.version>1.8.2</avro.version>
<bigquery.connector.hadoop2.version>hadoop2-1.0.0</bigquery.connector.hadoop2.version>
<bigquery.connector.hadoop2.version>hadoop2-1.2.0</bigquery.connector.hadoop2.version>
<commons.codec.version>1.4</commons.codec.version>
<cdap.version>6.9.1</cdap.version>
<cdap.plugin.version>2.11.1</cdap.plugin.version>
<dropwizard.metrics-core.version>3.2.6</dropwizard.metrics-core.version>
<flogger.system.backend.version>0.3.1</flogger.system.backend.version>
<gcs.connector.version>hadoop2-2.0.0</gcs.connector.version>
<flogger.system.backend.version>0.7.1</flogger.system.backend.version>
<gcs.connector.version>hadoop2-2.2.9</gcs.connector.version>
<google.cloud.bigtable.version>1.17.1</google.cloud.bigtable.version>
<google.cloud.bigquery.version>1.137.1</google.cloud.bigquery.version>
<google.cloud.kms.version>2.0.2</google.cloud.kms.version>
Expand Down Expand Up @@ -341,6 +341,22 @@
<groupId>com.google.flogger</groupId>
<artifactId>flogger-log4j-backend</artifactId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<groupId>com.google.flogger</groupId>
<artifactId>flogger</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>util-hadoop</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>util</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down Expand Up @@ -557,12 +573,42 @@
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>util-hadoop</artifactId>
<version>${gcs.connector.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.flogger</groupId>
<artifactId>flogger</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.flogger</groupId>
<artifactId>flogger-log4j-backend</artifactId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcs-connector</artifactId>
<version>${gcs.connector.version}</version>
<exclusions>
<exclusion>
<artifactId>grpc-api</artifactId>
<groupId>io.grpc</groupId>
</exclusion>
<exclusion>
<artifactId>grpc-census</artifactId>
<groupId>io.grpc</groupId>
</exclusion>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<groupId>com.google.flogger</groupId>
<artifactId>flogger</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.flogger</groupId>
<artifactId>flogger-log4j-backend</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ private Configuration getBaseConfiguration(@Nullable CryptoKeyName cmekKeyName)
cmekKeyName, config.getServiceAccountType());
baseConfiguration.setBoolean(BigQueryConstants.CONFIG_ALLOW_SCHEMA_RELAXATION,
config.isAllowSchemaRelaxation());
baseConfiguration.setStrings(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,
baseConfiguration.setStrings(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION.getKey(),
config.getWriteDisposition().name());
// this setting is needed because gcs has default chunk size of 64MB. This is large default chunk size which can
// cause OOM issue if there are many tables being written. See this - CDAP-16670
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.google.cloud.hadoop.io.bigquery.output.ForwardingBigQueryFileOutputCommitter;
import com.google.cloud.hadoop.io.bigquery.output.ForwardingBigQueryFileOutputFormat;
import com.google.cloud.hadoop.util.ConfigurationUtil;
import com.google.cloud.hadoop.util.HadoopConfigurationProperty;
import com.google.cloud.hadoop.util.ResilientOperation;
import com.google.cloud.hadoop.util.RetryDeterminer;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -487,7 +488,7 @@ private static void waitForJobCompletion(BigQueryHelper bigQueryHelper, String p
.setLocation(jobReference.getLocation());

Job pollJob = ResilientOperation.retry(
ResilientOperation.getGoogleRequestCallable(get),
get::execute,
operationBackOff,
RetryDeterminer.RATE_LIMIT_ERRORS,
IOException.class,
Expand Down Expand Up @@ -546,10 +547,10 @@ private static void waitForJobCompletion(BigQueryHelper bigQueryHelper, String p
private static TableReference getTableReference(Configuration conf) throws IOException {
// Ensure the BigQuery output information is valid.
String projectId = BigQueryOutputConfiguration.getProjectId(conf);
String datasetId =
ConfigurationUtil.getMandatoryConfig(conf, BigQueryConfiguration.OUTPUT_DATASET_ID_KEY);
String tableId =
ConfigurationUtil.getMandatoryConfig(conf, BigQueryConfiguration.OUTPUT_TABLE_ID_KEY);
String datasetId = ConfigurationUtil.getMandatoryConfig(conf,
BigQueryConfiguration.OUTPUT_DATASET_ID);
String tableId = ConfigurationUtil.getMandatoryConfig(conf,
BigQueryConfiguration.OUTPUT_TABLE_ID);

return new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId);
}
Expand All @@ -559,14 +560,14 @@ private static TableReference getTableReference(Configuration conf) throws IOExc
* Optional<TableSchema> instead of Optional<BigQueryTableSchema>.
*/
private static Optional<TableSchema> getTableSchema(Configuration conf) throws IOException {
String fieldsJson = conf.get(BigQueryConfiguration.OUTPUT_TABLE_SCHEMA_KEY);
String fieldsJson = conf.get(BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey());
if (!Strings.isNullOrEmpty(fieldsJson)) {
try {
TableSchema tableSchema = createTableSchemaFromFields(fieldsJson);
return Optional.of(tableSchema);
} catch (IOException e) {
throw new IOException(
"Unable to parse key '" + BigQueryConfiguration.OUTPUT_TABLE_SCHEMA_KEY + "'.", e);
"Unable to parse key '" + BigQueryConfiguration.OUTPUT_TABLE_SCHEMA.getKey() + "'.", e);
}
}
return Optional.empty();
Expand Down Expand Up @@ -748,7 +749,8 @@ private RangePartitioning createRangePartitioning(@Nullable String partitionByFi
}

private static BigQuery getBigQuery(Configuration config) throws IOException {
String projectId = ConfigurationUtil.getMandatoryConfig(config, BigQueryConfiguration.PROJECT_ID_KEY);
String projectId = ConfigurationUtil.getMandatoryConfig(config,
BigQueryConfiguration.PROJECT_ID);
Credentials credentials = GCPUtils.loadCredentialsFromConf(config);
return GCPUtils.getBigQuery(projectId, credentials);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
package io.cdap.plugin.gcp.bigquery.source;

import com.google.api.client.auth.oauth2.Credential;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryFactory;
import com.google.cloud.hadoop.util.AccessTokenProviderClassFromConfigFactory;
import com.google.cloud.hadoop.util.CredentialFromAccessTokenProviderClassFactory;
import com.google.cloud.hadoop.util.HadoopCredentialConfiguration;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.Collections;
import java.util.List;

/**
Expand All @@ -41,18 +43,16 @@ public BigQueryFactoryWithScopes(List<String> scopes) {
@Override
public Credential createBigQueryCredential(Configuration config) throws GeneralSecurityException, IOException {
Credential credential =
CredentialFromAccessTokenProviderClassFactory.credential(
new AccessTokenProviderClassFromConfigFactory().withOverridePrefix("mapred.bq"),
config,
scopes);
CredentialFromAccessTokenProviderClassFactory.credential(
config,
Collections.singletonList(BigQueryConfiguration.BIGQUERY_CONFIG_PREFIX),
scopes);
if (credential != null) {
return credential;
}

return HadoopCredentialConfiguration.newBuilder()
.withConfiguration(config)
.withOverridePrefix(BIGQUERY_CONFIG_PREFIX)
.build()
.getCredential(scopes);
return HadoopCredentialConfiguration.getCredentialFactory(
config, String.valueOf(ImmutableList.of(BigQueryConfiguration.BIGQUERY_CONFIG_PREFIX)))
.getCredential(BIGQUERY_OAUTH_SCOPES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.cloud.hadoop.io.bigquery.BigQueryUtils;
import com.google.cloud.hadoop.io.bigquery.ExportFileFormat;
import com.google.cloud.hadoop.util.ConfigurationUtil;
import com.google.cloud.hadoop.util.HadoopConfigurationProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand All @@ -36,6 +37,7 @@

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -95,14 +97,17 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
} catch (GeneralSecurityException gse) {
throw new IOException("Failed to create BigQuery client", gse);
}

List<HadoopConfigurationProperty<?>> hadoopConfigurationProperties = new ArrayList<>(
BigQueryConfiguration.MANDATORY_CONFIG_PROPERTIES_INPUT);
Map<String, String> mandatoryConfig = ConfigurationUtil.getMandatoryConfig(
configuration, BigQueryConfiguration.MANDATORY_CONFIG_PROPERTIES_INPUT);
String projectId = mandatoryConfig.get(BigQueryConfiguration.PROJECT_ID_KEY);
String datasetProjectId = mandatoryConfig.get(BigQueryConfiguration.INPUT_PROJECT_ID_KEY);
String datasetId = mandatoryConfig.get(BigQueryConfiguration.INPUT_DATASET_ID_KEY);
String tableName = mandatoryConfig.get(BigQueryConfiguration.INPUT_TABLE_ID_KEY);
configuration, hadoopConfigurationProperties);
String projectId = mandatoryConfig.get(BigQueryConfiguration.PROJECT_ID.getKey());
String datasetProjectId = mandatoryConfig.get(BigQueryConfiguration.INPUT_PROJECT_ID.getKey());
String datasetId = mandatoryConfig.get(BigQueryConfiguration.INPUT_DATASET_ID.getKey());
String tableName = mandatoryConfig.get(BigQueryConfiguration.INPUT_TABLE_ID.getKey());
String serviceAccount = configuration.get(BigQueryConstants.CONFIG_SERVICE_ACCOUNT, null);
Boolean isServiceAccountFilePath = configuration.getBoolean(BigQueryConstants.CONFIG_SERVICE_ACCOUNT_IS_FILE,
boolean isServiceAccountFilePath = configuration.getBoolean(BigQueryConstants.CONFIG_SERVICE_ACCOUNT_IS_FILE,
true);

String partitionFromDate = configuration.get(BigQueryConstants.CONFIG_PARTITION_FROM_DATE, null);
Expand Down Expand Up @@ -131,11 +136,11 @@ private void processQuery(JobContext context) throws IOException, InterruptedExc
runQuery(configuration, bigQueryHelper, projectId, exportTableReference, query, location);

// Default values come from BigquerySource config, and can be overridden by config.
configuration.set(BigQueryConfiguration.INPUT_PROJECT_ID_KEY,
configuration.set(BigQueryConfiguration.INPUT_PROJECT_ID.getKey(),
configuration.get(BigQueryConstants.CONFIG_VIEW_MATERIALIZATION_PROJECT));
configuration.set(BigQueryConfiguration.INPUT_DATASET_ID_KEY,
configuration.set(BigQueryConfiguration.INPUT_DATASET_ID.getKey(),
configuration.get(BigQueryConstants.CONFIG_VIEW_MATERIALIZATION_DATASET));
configuration.set(BigQueryConfiguration.INPUT_TABLE_ID_KEY, temporaryTableName);
configuration.set(BigQueryConfiguration.INPUT_TABLE_ID.getKey(), temporaryTableName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,10 @@ public static Configuration getBigQueryConfig(@Nullable String serviceAccountInf
configuration.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS");
configuration.set("fs.gs.project.id", projectId);
configuration.set("fs.gs.working.dir", GCSPath.ROOT_DIR);
configuration.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
configuration.set(BigQueryConfiguration.PROJECT_ID.getKey(), projectId);
if (cmekKeyName != null) {
configuration.set(BigQueryConfiguration.OUTPUT_TABLE_KMS_KEY_NAME_KEY, cmekKeyName.toString());
configuration.set(BigQueryConfiguration.OUTPUT_TABLE_KMS_KEY_NAME.getKey(),
cmekKeyName.toString());
}
return configuration;
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/cdap/plugin/gcp/common/GCPUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS;
import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
import com.google.cloud.hadoop.util.AccessTokenProviderClassFromConfigFactory;
import com.google.cloud.hadoop.util.CredentialFactory;
import com.google.cloud.hadoop.util.HadoopCredentialConfiguration;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
Expand Down Expand Up @@ -129,7 +129,7 @@ public static InputStream openServiceAccount(String serviceAccount, boolean isFi
*/
public static Map<String, String> generateGCSAuthProperties(@Nullable String serviceAccount,
String serviceAccountType) {
return generateAuthProperties(serviceAccount, serviceAccountType, CredentialFactory.GCS_SCOPES, GCS_PREFIX);
return generateAuthProperties(serviceAccount, serviceAccountType, CredentialFactory.DEFAULT_SCOPES, GCS_PREFIX);
}

/**
Expand All @@ -142,7 +142,7 @@ public static Map<String, String> generateGCSAuthProperties(@Nullable String ser
*/
public static Map<String, String> generateBigQueryAuthProperties(@Nullable String serviceAccount,
String serviceAccountType) {
List<String> scopes = new ArrayList<>(CredentialFactory.GCS_SCOPES);
List<String> scopes = new ArrayList<>(CredentialFactory.DEFAULT_SCOPES);
scopes.addAll(BIGQUERY_SCOPES);
return generateAuthProperties(serviceAccount, serviceAccountType, scopes, GCS_PREFIX, BQ_PREFIX);
}
Expand Down Expand Up @@ -176,7 +176,7 @@ private static Map<String, String> generateAuthProperties(@Nullable String servi
// mapred.bq.auth.access.token.provider.impl
// for use by GCS and BQ.
for (String prefix : prefixes) {
properties.put(prefix + AccessTokenProviderClassFromConfigFactory.ACCESS_TOKEN_PROVIDER_IMPL_SUFFIX,
properties.put(prefix + HadoopCredentialConfiguration.ACCESS_TOKEN_PROVIDER_IMPL_SUFFIX,
ServiceAccountAccessTokenProvider.class.getName());
}
return properties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ private Configuration getBaseConfiguration(@Nullable CryptoKeyName cmekKey) thro
cmekKey, config.getServiceAccountType());
baseConfiguration.setBoolean(BigQueryConstants.CONFIG_ALLOW_SCHEMA_RELAXATION,
config.isUpdateTableSchema());
baseConfiguration.setStrings(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION_KEY,
baseConfiguration.setStrings(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION.getKey(),
config.getWriteDisposition().name());
// this setting is needed because gcs has default chunk size of 64MB. This is large default chunk size which can
// cause OOM issue if there are many tables being written. See this - CDAP-16670
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ private BigQueryOutputFormat.BigQueryOutputCommitter initMocks(String operation)

Configuration conf = new Configuration();
conf.set("mapred.bq.output.project.id", "test_project");
conf.set(BigQueryConfiguration.OUTPUT_DATASET_ID_KEY, "test_dataset");
conf.set(BigQueryConfiguration.OUTPUT_TABLE_ID_KEY, "test_table");
conf.set(BigQueryConfiguration.OUTPUT_DATASET_ID.getKey(), "test_dataset");
conf.set(BigQueryConfiguration.OUTPUT_TABLE_ID.getKey(), "test_table");
conf.set("mapred.bq.output.gcs.fileformat", BigQueryFileFormat.AVRO.toString());
conf.set(BigQueryConstants.CONFIG_OPERATION, operation);

Expand Down

0 comments on commit 2c0b6a1

Please sign in to comment.