diff --git a/plugins/core-plugin/src/main/resources/Dockerfile-template-yaml b/plugins/core-plugin/src/main/resources/Dockerfile-template-yaml index b254409cf0..24b56bf343 100644 --- a/plugins/core-plugin/src/main/resources/Dockerfile-template-yaml +++ b/plugins/core-plugin/src/main/resources/Dockerfile-template-yaml @@ -21,7 +21,9 @@ RUN if ! [ -f requirements.txt ] ; then echo "$BEAM_PACKAGE" > requirements.txt # Install dependencies to launch the pipeline and download to reduce startup time # Remove Jinja2 dependency once YAML templatization support is added to Beam +# TODO - remove `pip uninstall apache-beam` line when repo is upgraded to Beam 2.59.0 RUN python -m venv /venv \ + && /venv/bin/pip uninstall apache-beam -y \ && /venv/bin/pip install --no-cache-dir --upgrade pip setuptools \ && /venv/bin/pip install --no-cache-dir -U -r $REQUIREMENTS_FILE \ && /venv/bin/pip install --no-cache-dir -U Jinja2 \ diff --git a/python/pom.xml b/python/pom.xml index e437a111a1..63996fbf94 100644 --- a/python/pom.xml +++ b/python/pom.xml @@ -36,6 +36,9 @@ 11 11 + + + 2.57.0 diff --git a/python/src/test/java/com/google/cloud/teleport/templates/python/PubSubToBigQueryYamlIT.java b/python/src/test/java/com/google/cloud/teleport/templates/python/PubSubToBigQueryYamlIT.java new file mode 100644 index 0000000000..c4df48cd6b --- /dev/null +++ b/python/src/test/java/com/google/cloud/teleport/templates/python/PubSubToBigQueryYamlIT.java @@ -0,0 +1,150 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.templates.python; + +import static org.apache.beam.it.gcp.bigquery.matchers.BigQueryAsserts.assertThatBigQueryRecords; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; + +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableResult; +import com.google.cloud.teleport.metadata.TemplateIntegrationTest; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Resources; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import org.apache.beam.it.common.PipelineLauncher.LaunchConfig; +import org.apache.beam.it.common.PipelineLauncher.LaunchInfo; +import org.apache.beam.it.common.PipelineOperator.Result; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.gcp.TemplateTestBase; +import org.apache.beam.it.gcp.bigquery.BigQueryResourceManager; +import org.apache.beam.it.gcp.bigquery.conditions.BigQueryRowsCheck; +import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; +import org.apache.commons.lang3.RandomStringUtils; +import org.json.JSONObject; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Integration test for {@link YAMLTemplate} Flex template. */ +@Category(TemplateIntegrationTest.class) +@TemplateIntegrationTest(YAMLTemplate.class) +@RunWith(JUnit4.class) +public class PubSubToBigQueryYamlIT extends TemplateTestBase { + + private PubsubResourceManager pubsubResourceManager; + private BigQueryResourceManager bigQueryResourceManager; + + private static final int MESSAGES_COUNT = 10; + private static final String YAML_PIPELINE = "PubSubToBigQueryYamlIT.yaml"; + private static final String YAML_PIPELINE_GCS_PATH = "input/" + YAML_PIPELINE; + + @Before + public void setUp() throws IOException { + pubsubResourceManager = + PubsubResourceManager.builder(testName, PROJECT, credentialsProvider).build(); + bigQueryResourceManager = + BigQueryResourceManager.builder(testName, PROJECT, credentials).build(); + + gcsClient.createArtifact(YAML_PIPELINE_GCS_PATH, createSimpleYamlMessage()); + } + + @After + public void cleanUp() { + ResourceManagerUtils.cleanResources(pubsubResourceManager, bigQueryResourceManager); + } + + @Test + public void testPubSubToBigQuery() throws IOException { + basePubSubToBigQuery(Function.identity()); // no extra parameters + } + + private void basePubSubToBigQuery( + Function paramsAdder) throws IOException { + // Arrange + List bqSchemaFields = + Arrays.asList( + Field.of("id", StandardSQLTypeName.INT64), + Field.of("job", StandardSQLTypeName.STRING), + Field.of("name", StandardSQLTypeName.STRING)); + Schema bqSchema = Schema.of(bqSchemaFields); + + String nameSuffix = RandomStringUtils.randomAlphanumeric(8); + TopicName topic = pubsubResourceManager.createTopic("input-" + nameSuffix); + bigQueryResourceManager.createDataset(REGION); + SubscriptionName subscription = + pubsubResourceManager.createSubscription(topic, "sub-1-" + nameSuffix); + TableId table = bigQueryResourceManager.createTable(testName, bqSchema); + + LaunchConfig.Builder options = + paramsAdder.apply( + LaunchConfig.builder(testName, specPath) + .addParameter("yaml_pipeline_file", getGcsPath(YAML_PIPELINE_GCS_PATH)) + .addParameter( + "jinja_variables", + String.format( + "{" + "\"SUBSCRIPTION\": \"%s\", " + "\"BQ_TABLE\": \"%s\"" + "}", + subscription.toString(), toTableSpecStandard(table)))); + + // Act + LaunchInfo info = launchTemplate(options); + assertThatPipeline(info).isRunning(); + + List> expectedMessages = new ArrayList<>(); + for (int i = 1; i <= MESSAGES_COUNT; i++) { + Map message = Map.of("id", i, "job", testName, "name", "message"); + ByteString messageData = ByteString.copyFromUtf8(new JSONObject(message).toString()); + pubsubResourceManager.publish(topic, ImmutableMap.of(), messageData); + expectedMessages.add(message); + } + + Result result = + pipelineOperator() + .waitForConditionsAndFinish( + createConfig(info), + BigQueryRowsCheck.builder(bigQueryResourceManager, table) + .setMinRows(MESSAGES_COUNT) + .build()); + + // Assert + assertThatResult(result).meetsConditions(); + + TableResult records = bigQueryResourceManager.readTable(table); + + // Make sure record can be read + assertThatBigQueryRecords(records).hasRecordsUnordered(expectedMessages); + } + + private String createSimpleYamlMessage() throws IOException { + return Files.readString(Paths.get(Resources.getResource(YAML_PIPELINE).getPath())); + } +} diff --git a/python/src/test/resources/PubSubToBigQueryYamlIT.yaml b/python/src/test/resources/PubSubToBigQueryYamlIT.yaml new file mode 100644 index 0000000000..98dbda583e --- /dev/null +++ b/python/src/test/resources/PubSubToBigQueryYamlIT.yaml @@ -0,0 +1,21 @@ +pipeline: + type: chain + transforms: + - type: ReadFromPubSub + config: + subscription: {{ SUBSCRIPTION }} + format: json + schema: + type: object + properties: + id: {type: integer} + job: {type: string} + name: {type: string} + - type: WriteToBigQuery + config: + table: {{ BQ_TABLE }} + windowing: + type: fixed + size: 5s +options: + streaming: true \ No newline at end of file