Skip to content

Commit

Permalink
Add CMEK support (#117)
Browse files Browse the repository at this point in the history
Also remove the previous limitation where you could only insert data into a BQ table that already existed.
  • Loading branch information
jphalip authored Jul 9, 2024
1 parent 7ee1092 commit 486bbee
Show file tree
Hide file tree
Showing 19 changed files with 392 additions and 159 deletions.
81 changes: 68 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,17 +278,18 @@ You can use the following properties in the `TBLPROPERTIES` clause when you crea

You can set the following Hive/Hadoop configuration properties in your environment:

| Property | Default value | Description |
|---------------------------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `bq.read.data.format` | `arrow` | Data format used for reads from BigQuery. Possible values: `arrow`, `avro`. |
| `bq.temp.gcs.path` | | GCS location for storing temporary Avro files when using the `indirect` write method |
| `bq.write.method` | `direct` | Indicates how to write data to BigQuery. Possible values: `direct` (to directly write to the BigQuery storage API), `indirect` (to stage temporary Avro files to GCS before loading into BigQuery). |
| `bq.work.dir.parent.path` | `${hadoop.tmp.dir}` | Parent path on HDFS where each job creates its temporary work directory |
| `bq.work.dir.name.prefix` | `hive-bq-` | Prefix used for naming the jobs' temporary directories. |
| `materializationProject` | | Project used to temporarily materialize data when reading views. Defaults to the same project as the read view. |
| `materializationDataset` | | Dataset used to temporarily materialize data when reading views. Defaults to the same dataset as the read view. |
| `maxParallelism` | | Maximum initial number of read streams |
| `viewsEnabled` | `false` | Set it to `true` to enable reading views. |
| Property | Default value | Description |
|-------------------------------------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `bq.read.data.format` | `arrow` | Data format used for reads from BigQuery. Possible values: `arrow`, `avro`. |
| `bq.temp.gcs.path` | | GCS location for storing temporary Avro files when using the `indirect` write method |
| `bq.write.method` | `direct` | Indicates how to write data to BigQuery. Possible values: `direct` (to directly write to the BigQuery storage API), `indirect` (to stage temporary Avro files to GCS before loading into BigQuery). |
| `bq.work.dir.parent.path` | `${hadoop.tmp.dir}` | Parent path on HDFS where each job creates its temporary work directory |
| `bq.work.dir.name.prefix` | `hive-bq-` | Prefix used for naming the jobs' temporary directories. |
| `bq.destination.table.kms.key.name` | | Cloud KMS encryption key used to protect the job's destination BigQuery table. Read more in the section on [customer-managed encryption keys](#customer-managed-encryption-keys) |
| `materializationProject` | | Project used to temporarily materialize data when reading views. Defaults to the same project as the read view. |
| `materializationDataset` | | Dataset used to temporarily materialize data when reading views. Defaults to the same dataset as the read view. |
| `maxParallelism` | | Maximum initial number of read streams |
| `viewsEnabled` | `false` | Set it to `true` to enable reading views. |

## Data Type Mapping

Expand Down Expand Up @@ -713,6 +714,24 @@ There are multiple options to override the default behavior and to provide custo
with the `bq.access.token` configuration property. You can generate an access token by running
`gcloud auth application-default print-access-token`.

## Customer-managed encryption key (CMEK)

You can provide a Cloud KMS key to be used to encrypt the destination table, for example when you
run a `CREATE TABLE` statement for a managed table, or when you insert data into a table that
doesn't exist yet. To do so, set the `bq.destination.table.kms.key.name` property with the
fully-qualified named of the desired Cloud KMS key in the form:
```
projects/<KMS_PROJECT_ID>/locations/<LOCATION>/keyRings/<KEY_RING>/cryptoKeys/<KEY>
```
The BigQuery service account associated with your project requires access to this encryption key.
The table will be encrypted by the key only if it created by the connector. A pre-existing
unencrypted table won't be encrypted just by setting this option.

For further information about using customer-managed encryption keys (CMEK) with BigQuery, see [here](https://cloud.google.com/bigquery/docs/customer-managed-encryption#key_resource_id).

## Known issues and limitations

* The `UPDATE`, `MERGE`, and `DELETE`, and `ALTER TABLE` statements are currently not supported.
Expand Down Expand Up @@ -766,15 +785,16 @@ Enable the following APIs:
```sh
gcloud services enable \
bigquerystorage.googleapis.com \
bigqueryconnection.googleapis.com
bigqueryconnection.googleapis.com \
cloudkms.googleapis.com
```
#### BigLake setup
Define environment variables:
```sh
export PROJECT=my-gcp-project
export PROJECT=<my-gcp-project>
export BIGLAKE_LOCATION=us
export BIGLAKE_REGION=us-central1
export BIGLAKE_CONNECTION=hive-integration-tests
Expand Down Expand Up @@ -807,6 +827,41 @@ export BIGLAKE_SA=$(bq show --connection --format json "${PROJECT}.${BIGLAKE_LOC
gsutil iam ch serviceAccount:${BIGLAKE_SA}:objectViewer gs://${BIGLAKE_BUCKET}
```

#### KMS setup

Create a KMS keyring:

```sh
gcloud kms keyrings create \
integration_tests_keyring \
--location us
```

```sh
gcloud kms keys create integration_tests_key \
--keyring integration_tests_keyring \
--location us \
--purpose "encryption"
```

Obtain the BigQuery service account name:

```sh
BQ_SERVICE_ACCOUNT=$(bq show --encryption_service_account --format json | jq -r ".ServiceAccountID")
```

Assign the Encrypter/Decrypter role to the BigQuery service account:

```sh
gcloud kms keys add-iam-policy-binding \
--project=${PROJECT} \
--member serviceAccount:${BQ_SERVICE_ACCOUNT} \
--role roles/cloudkms.cryptoKeyEncrypterDecrypter \
--location=us \
--keyring=integration_tests_keyring \
integration_tests_key
```

#### Running the tests

You must use Java version 8, as it's the version that Hive itself uses. Make sure that `JAVA_HOME` points to the Java
Expand Down
3 changes: 3 additions & 0 deletions cloudbuild/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ steps:
args: ['/workspace/cloudbuild/presubmit.sh', 'integrationtest_hive1']
env:
- 'CODECOV_TOKEN=${_CODECOV_TOKEN}'
- 'BIGQUERY_KMS_KEY_NAME=${_BIGQUERY_KMS_KEY_NAME}'

# 7. Run integration tests for Hive 2
- name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit'
Expand All @@ -66,6 +67,7 @@ steps:
args: ['/workspace/cloudbuild/presubmit.sh', 'integrationtest_hive2']
env:
- 'CODECOV_TOKEN=${_CODECOV_TOKEN}'
- 'BIGQUERY_KMS_KEY_NAME=${_BIGQUERY_KMS_KEY_NAME}'

# 8. Run integration tests for Hive 3
- name: 'gcr.io/$PROJECT_ID/dataproc-hive-bigquery-connector-presubmit'
Expand All @@ -75,6 +77,7 @@ steps:
args: ['/workspace/cloudbuild/presubmit.sh', 'integrationtest_hive3']
env:
- 'CODECOV_TOKEN=${_CODECOV_TOKEN}'
- 'BIGQUERY_KMS_KEY_NAME=${_BIGQUERY_KMS_KEY_NAME}'

# Tests should take under 120 mins
timeout: 7200s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,27 @@ public void run(HookContext hookContext) throws Exception {
return;
}

// Parse and analyze the semantics of the Hive query
// Parse and analyze the semantics of the Hive query.
// We have to do this because unfortunately the WriteEntity objects in hookContext.getOutputs()
// are systematically marked as being of type INSERT_OVERWRITE, regardless of whether it is
// an "INSERT OVERWRITE" query or a regular "INSERT" query. This is apparently caused by the
// fact that Hive 1.x.x treats all "non native" tables (i.e. by Hive 1.x.x's definition all
// tables that have a storage handler defined:
// https://github.com/apache/hive/blob/release-1.2.1/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java#L845)
// as INSERT_OVERWRITE:
// https://github.com/apache/hive/blob/release-1.2.1/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java#L12147
// To get around this issue, we parse the query ourselves and try to determine the proper type
// for our purposes (insert or insert overwrite).
QBParseInfo parseInfo;
try {
Configuration conf = hookContext.getConf();
Context context = new Context(conf);
context.setCmd(hookContext.getQueryPlan().getQueryString());
ParseDriver parseDriver = new ParseDriver();
ASTNode tree = parseDriver.parse(hookContext.getQueryPlan().getQueryString(), context);
HiveConf hiveConf = new HiveConf(conf, HiveConf.class);
HiveConf hiveConf = new HiveConf(conf, this.getClass());
SemanticAnalyzer analyzer = new SemanticAnalyzer(hiveConf);
if (tree.getChildren().size() == 0 || tree.getChild(0).getType() != HiveParser.TOK_QUERY) {
if (tree.getChildren().isEmpty() || tree.getChild(0).getType() != HiveParser.TOK_QUERY) {
return;
}
analyzer.analyze((ASTNode) tree.getChild(0), context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public Map<String, String> getBasicStatistics(Partish partish) {
Guice.createInjector(
new BigQueryClientModule(),
new HiveBigQueryConnectorModule(conf, hmsTable.getParameters()));
BigQueryClient bqClient = injector.getInstance(BigQueryClient.class);
HiveBigQueryConfig config = injector.getInstance(HiveBigQueryConfig.class);
return BigQueryUtils.getBasicStatistics(bqClient, config.getTableId());
return BigQueryUtils.getBasicStatistics(
injector.getInstance(BigQueryClient.class),
injector.getInstance(HiveBigQueryConfig.class).getTableId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,21 +134,32 @@ public static void assertDoesNotContainColumn(Table hmsTable, String columnName)
}
}

protected void createBigQueryTable(Table hmsTable, TableInfo bigQueryTableInfo) {
Injector injector =
Guice.createInjector(
new BigQueryClientModule(),
new HiveBigQueryConnectorModule(conf, hmsTable.getParameters()));
HiveBigQueryConfig opts = injector.getInstance(HiveBigQueryConfig.class);
protected void createBigQueryTable(
Injector injector,
TableId tableId,
StandardTableDefinition tableDefinition,
HiveBigQueryConfig opts,
Table hmsTable) {
// TODO: We currently can't use the `BigQueryClient.createTable()` because it doesn't have a way
// to
// pass a TableInfo. This forces us to duplicate some code below from the existing
// `BigQueryClient.createTable()`. One better long-term solution would be to add a
// `createTable(TableInfo)` method to BigQueryClient. See:
// https://github.com/GoogleCloudDataproc/spark-bigquery-connector/issues/1213
TableInfo.Builder bigQueryTableInfo =
TableInfo.newBuilder(tableId, tableDefinition)
.setDescription(hmsTable.getParameters().get("comment"));
opts.getKmsKeyName()
.ifPresent(
keyName ->
bigQueryTableInfo.setEncryptionConfiguration(
EncryptionConfiguration.newBuilder().setKmsKeyName(keyName).build()));
BigQueryCredentialsSupplier credentialsSupplier =
injector.getInstance(BigQueryCredentialsSupplier.class);
HeaderProvider headerProvider = injector.getInstance(HeaderProvider.class);

// TODO: We cannot use the BigQueryClient class here because it doesn't have a
// `create(TableInfo)` method. We could add it to that class eventually.
BigQuery bigQueryService =
BigQueryUtils.getBigQueryService(opts, headerProvider, credentialsSupplier);
bigQueryService.create(bigQueryTableInfo);
bigQueryService.create(bigQueryTableInfo.build());
}

/**
Expand Down Expand Up @@ -247,12 +258,7 @@ public void preCreateTable(Table table) throws MetaException {
tableDefBuilder.setTimePartitioning(tpBuilder.build());
}

StandardTableDefinition tableDefinition = tableDefBuilder.build();
TableInfo bigQueryTableInfo =
TableInfo.newBuilder(tableId, tableDefinition)
.setDescription(table.getParameters().get("comment"))
.build();
createBigQueryTable(table, bigQueryTableInfo);
createBigQueryTable(injector, tableId, tableDefBuilder.build(), opts, table);

String hmsDbTableName = HiveUtils.getDbTableName(table);
LOG.info("Created BigQuery table {} for {}", tableId, hmsDbTableName);
Expand Down Expand Up @@ -366,6 +372,11 @@ public void rollbackInsertTable(Table table, boolean overwrite) throws MetaExcep
}

public void commitDropTable(Table table, boolean deleteData) throws MetaException {
if (conf.getBoolean(HiveBigQueryConfig.CONNECTOR_IN_TEST, false)
&& conf.getBoolean(HiveBigQueryConfig.FORCE_DROP_FAILURE, false)) {
// For integration testing only
throw new RuntimeException(HiveBigQueryConfig.FORCED_DROP_FAILURE_ERROR_MESSAGE);
}
if (!HiveUtils.isExternalTable(table) && deleteData) {
// This is a managed table, so let's delete the table in BigQuery
Injector injector =
Expand Down
Loading

0 comments on commit 486bbee

Please sign in to comment.