Skip to content

Commit

Permalink
add KMS key support to BigQueryConverters (#2024)
Browse files Browse the repository at this point in the history
* add KMS key support to BigQueryConverters

Signed-off-by: Jeffrey Kinard <[email protected]>

* address comments

Signed-off-by: Jeffrey Kinard <[email protected]>

---------

Signed-off-by: Jeffrey Kinard <[email protected]>
  • Loading branch information
Polber authored Nov 27, 2024
1 parent b68fe21 commit add3dd7
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import org.apache.commons.text.StringSubstitutor;
import org.apache.parquet.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -241,6 +242,18 @@ public interface BigQueryReadOptions extends PipelineOptions {
String getQueryTempDataset();

void setQueryTempDataset(String queryTempDataset);

@TemplateParameter.KmsEncryptionKey(
order = 7,
optional = true,
description = "Google Cloud KMS key",
helpText =
"If reading from BigQuery using query source, use this Cloud KMS key to encrypt any temporary tables created.",
example =
"projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key")
String getKMSEncryptionKey();

void setKMSEncryptionKey(String keyName);
}

/**
Expand Down Expand Up @@ -340,11 +353,17 @@ public static <T> ReadBigQuery.Builder<T> newBuilder() {
@Override
public PCollection<T> expand(PBegin pipeline) {

BigQueryIO.TypedRead<T> readFunction = readFunction();

if (!Strings.isNullOrEmpty(options().getKMSEncryptionKey())) {
readFunction = readFunction.withKmsKey(options().getKMSEncryptionKey());
}

if (options().getQuery() == null) {
LOG.info("No query provided, reading directly from: " + options().getInputTableSpec());
return pipeline.apply(
"ReadFromBigQuery",
readFunction()
readFunction
.from(options().getInputTableSpec())
.withTemplateCompatibility()
.withMethod(Method.DIRECT_READ));
Expand All @@ -357,7 +376,7 @@ public PCollection<T> expand(PBegin pipeline) {
LOG.info("Using Standard SQL");
return pipeline.apply(
"ReadFromBigQueryWithQuery",
readFunction()
readFunction
.fromQuery(options().getQuery())
.withTemplateCompatibility()
.withQueryLocation(options().getQueryLocation())
Expand All @@ -368,7 +387,7 @@ public PCollection<T> expand(PBegin pipeline) {
LOG.info("Using Legacy SQL");
return pipeline.apply(
"ReadFromBigQueryWithQuery",
readFunction()
readFunction
.fromQuery(options().getQuery())
.withTemplateCompatibility()
.withQueryLocation(options().getQueryLocation())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.kms.v1.CryptoKey;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import java.io.IOException;
import java.util.List;
Expand All @@ -42,6 +43,7 @@
import org.apache.beam.it.gcp.TemplateTestBase;
import org.apache.beam.it.gcp.bigquery.BigQueryResourceManager;
import org.apache.beam.it.gcp.bigquery.utils.BigQueryTestUtil;
import org.apache.beam.it.gcp.kms.KMSResourceManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -57,6 +59,14 @@ public final class BigQueryToElasticsearchIT extends TemplateTestBase {

private BigQueryResourceManager bigQueryClient;
private ElasticsearchResourceManager elasticsearchResourceManager;
private KMSResourceManager kmsResourceManager;

// Used by BigQueryIO to encrypt data in temporary tables
CryptoKey cryptoKey;

private static final String KMS_REGION = "global";
private static final String KEYRING_ID = "BigQueryToElasticsearch";
private static final String CRYPTO_KEY_NAME = "key1";

// Define a set of parameters used to allow configuration of the test size being run.
private static final String BIGQUERY_ID_COL = "test_id";
Expand All @@ -72,11 +82,16 @@ public final class BigQueryToElasticsearchIT extends TemplateTestBase {
public void setup() {
bigQueryClient = BigQueryResourceManager.builder(testName, PROJECT, credentials).build();
elasticsearchResourceManager = ElasticsearchResourceManager.builder(testId).build();
kmsResourceManager =
KMSResourceManager.builder(PROJECT, credentialsProvider).setRegion(KMS_REGION).build();

cryptoKey = kmsResourceManager.getOrCreateCryptoKey(KEYRING_ID, CRYPTO_KEY_NAME);
}

@After
public void tearDown() {
ResourceManagerUtils.cleanResources(bigQueryClient, elasticsearchResourceManager);
ResourceManagerUtils.cleanResources(
bigQueryClient, elasticsearchResourceManager, kmsResourceManager);
}

@Test
Expand Down Expand Up @@ -111,7 +126,7 @@ public void testBigQueryToElasticsearch() throws IOException {
// Assert
assertThatResult(result).isLaunchFinished();

assertThat(elasticsearchResourceManager.count(indexName)).isEqualTo(20);
assertThat(elasticsearchResourceManager.count(indexName)).isEqualTo(BIGQUERY_NUM_ROWS);
assertThatRecords(elasticsearchResourceManager.fetchAll(indexName))
.hasRecordsUnordered(bigQueryRowsToRecords(bigQueryRows));
}
Expand Down Expand Up @@ -143,15 +158,16 @@ public void testBigQueryToElasticsearchQuery() throws IOException {
.addParameter("connectionUrl", elasticsearchResourceManager.getUri())
.addParameter("disableCertificateValidation", "true")
.addParameter("index", indexName)
.addParameter("apiKey", "elastic"));
.addParameter("apiKey", "elastic")
.addParameter("KMSEncryptionKey", cryptoKey.getName()));
assertThatPipeline(info).isRunning();

Result result = pipelineOperator().waitUntilDone(createConfig(info));

// Assert
assertThatResult(result).isLaunchFinished();

assertThat(elasticsearchResourceManager.count(indexName)).isEqualTo(20);
assertThat(elasticsearchResourceManager.count(indexName)).isEqualTo(BIGQUERY_NUM_ROWS);
assertThatRecords(elasticsearchResourceManager.fetchAll(indexName))
.hasRecordsUnordered(bigQueryRowsToRecords(bigQueryRows));
}
Expand Down

0 comments on commit add3dd7

Please sign in to comment.