diff --git a/sdks/java/extensions/schemaio-expansion-service/build.gradle b/sdks/java/extensions/schemaio-expansion-service/build.gradle
index d23330d73c22..246c0c155cbd 100644
--- a/sdks/java/extensions/schemaio-expansion-service/build.gradle
+++ b/sdks/java/extensions/schemaio-expansion-service/build.gradle
@@ -32,8 +32,14 @@ applyJavaNature(
dependencies {
implementation project(path: ":sdks:java:expansion-service")
permitUnusedDeclared project(path: ":sdks:java:expansion-service") // BEAM-11761
+ implementation project(":sdks:java:extensions:google-cloud-platform-core")
+ permitUnusedDeclared project(path: ":sdks:java:extensions:google-cloud-platform-core") // BEAM-11761
+ implementation project(":sdks:java:io:csv")
+ permitUnusedDeclared project(path: ":sdks:java:io:csv") // BEAM-11761
implementation project(":sdks:java:io:jdbc")
permitUnusedDeclared project(":sdks:java:io:jdbc") // BEAM-11761
+ implementation project(":sdks:java:io:json")
+ permitUnusedDeclared project(path: ":sdks:java:io:json") // BEAM-11761
implementation library.java.postgres
permitUnusedDeclared library.java.postgres // BEAM-11761
implementation project(path: ":model:pipeline", configuration: "shadow")
diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java
new file mode 100644
index 000000000000..4e07a06197f5
--- /dev/null
+++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/CsvWriteTransformProvider.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv.providers;
+
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.csv.CsvIO;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.apache.commons.csv.CSVFormat;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for {@link CsvIO#write}.
+ *
+ *
Internal only: This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class CsvWriteTransformProvider
+ extends TypedSchemaTransformProvider {
+ private static final String INPUT_ROWS_TAG = "input";
+ private static final String WRITE_RESULTS = "output";
+
+ @Override
+ protected Class configurationClass() {
+ return CsvWriteConfiguration.class;
+ }
+
+ @Override
+ protected SchemaTransform from(CsvWriteConfiguration configuration) {
+ return new CsvWriteTransform(configuration);
+ }
+
+ @Override
+ public String identifier() {
+ return String.format("beam:schematransform:org.apache.beam:csv_write:v1");
+ }
+
+ @Override
+ public List inputCollectionNames() {
+ return Collections.singletonList(INPUT_ROWS_TAG);
+ }
+
+ @Override
+ public List outputCollectionNames() {
+ return Collections.singletonList(WRITE_RESULTS);
+ }
+
+ /** Configuration for writing to BigQuery with Storage Write API. */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class CsvWriteConfiguration {
+
+ public void validate() {
+ checkArgument(
+ !Strings.isNullOrEmpty(this.getPath()), "Path for a CSV Write must be specified.");
+ }
+
+ public static Builder builder() {
+ return new AutoValue_CsvWriteTransformProvider_CsvWriteConfiguration.Builder();
+ }
+
+ @SchemaFieldDescription("The file path to write to.")
+ public abstract String getPath();
+
+ /** Builder for {@link CsvWriteConfiguration}. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setPath(String path);
+
+ /** Builds a {@link CsvWriteConfiguration} instance. */
+ public abstract CsvWriteConfiguration build();
+ }
+ }
+
+ /** A {@link SchemaTransform} for {@link CsvIO#write}. */
+ protected static class CsvWriteTransform extends SchemaTransform {
+
+ private final CsvWriteConfiguration configuration;
+
+ CsvWriteTransform(CsvWriteConfiguration configuration) {
+ configuration.validate();
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ WriteFilesResult> result =
+ input
+ .get(INPUT_ROWS_TAG)
+ .apply(CsvIO.writeRows(configuration.getPath(), CSVFormat.DEFAULT).withSuffix(""));
+ Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING));
+ return PCollectionRowTuple.of(
+ WRITE_RESULTS,
+ result
+ .getPerDestinationOutputFilenames()
+ .apply(
+ "Collect filenames",
+ MapElements.into(TypeDescriptors.rows())
+ .via(
+ (destinationAndRow) ->
+ Row.withSchema(outputSchema)
+ .withFieldValue("filename", destinationAndRow.getValue())
+ .build()))
+ .setRowSchema(outputSchema));
+ }
+ }
+}
diff --git a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/package-info.java b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/package-info.java
new file mode 100644
index 000000000000..646e69b7cb8c
--- /dev/null
+++ b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/providers/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Transforms for reading and writing CSV files. */
+package org.apache.beam.sdk.io.csv.providers;
diff --git a/sdks/java/io/json/build.gradle b/sdks/java/io/json/build.gradle
new file mode 100644
index 000000000000..fe1f607a3696
--- /dev/null
+++ b/sdks/java/io/json/build.gradle
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(
+ automaticModuleName: 'org.apache.beam.sdk.io.json'
+)
+
+description = "Apache Beam :: SDKs :: Java :: IO :: JSON"
+ext.summary = "IO to read and write JSON files."
+
+dependencies {
+ implementation project(path: ":sdks:java:core", configuration: "shadow")
+ implementation library.java.vendored_guava_32_1_2_jre
+ implementation library.java.everit_json_schema
+ testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
+ testImplementation library.java.junit
+ testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
+ testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
+}
\ No newline at end of file
diff --git a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/JsonIO.java b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/JsonIO.java
new file mode 100644
index 000000000000..3abb29a80427
--- /dev/null
+++ b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/JsonIO.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.json;
+
+import static org.apache.beam.sdk.values.TypeDescriptors.rows;
+import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.Compression;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.JsonUtils;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * {@link PTransform}s for reading and writing JSON files.
+ *
+ * Reading JSON files
+ *
+ * Reading from JSON files is not yet implemented in Java. Please see https://github.com/apache/beam/issues/24552.
+ *
+ *
Writing JSON files
+ *
+ * To write a {@link PCollection} to one or more line-delimited JSON files, use {@link
+ * JsonIO.Write}, using{@link JsonIO#writeRows} or {@link JsonIO#write}. {@link JsonIO.Write}
+ * supports writing {@link Row} or custom Java types using an inferred {@link Schema}. Examples
+ * below show both scenarios. See the Beam Programming Guide on inferring
+ * schemas for more information on how to enable Beam to infer a {@link Schema} from a custom
+ * Java type.
+ *
+ *
Example usage:
+ *
+ * Suppose we have the following Transaction
class annotated with
+ * {@code @DefaultSchema(JavaBeanSchema.class)} so that Beam can infer its {@link Schema}:
+ *
+ *
{@code @DefaultSchema(JavaBeanSchema.class)
+ * public class Transaction {
+ * public Transaction() { … }
+ * public Long getTransactionId();
+ * public void setTransactionId(Long transactionId) { … }
+ * public String getBank() { … }
+ * public void setBank(String bank) { … }
+ * public double getPurchaseAmount() { … }
+ * public void setPurchaseAmount(double purchaseAmount) { … }
+ * }
+ * }
+ *
+ * From a {@code PCollection}, {@link JsonIO.Write} can write one or many JSON
+ * files.
+ *
+ * {@code
+ * PCollection transactions = ...
+ * transactions.apply(JsonIO.write("path/to/folder/prefix"));
+ * }
+ *
+ * The resulting JSON files will look like the following where the header is repeated for every
+ * file, whereas by default, {@link JsonIO.Write} will write all fields in sorted order of
+ * the field names.
+ *
+ *
{@code
+ * {"bank": "A", "purchaseAmount": 10.23, "transactionId": 12345}
+ * {"bank": "B", "purchaseAmount": 54.65, "transactionId": 54321}
+ * {"bank": "C", "purchaseAmount": 11,76, "transactionId": 98765}
+ * }
+ *
+ * A {@link PCollection} of {@link Row}s works just like custom Java types illustrated above,
+ * except we use {@link JsonIO#writeRows} as shown below for the same {@code Transaction} class. We
+ * derive {@code Transaction}'s {@link Schema} using a {@link
+ * org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider}. Note that
+ * hard-coding the {@link Row}s below is for illustration purposes. Developers are instead
+ * encouraged to take advantage of {@link
+ * org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider#toRowFunction}.
+ *
+ *
{@code
+ * DefaultSchemaProvider defaultSchemaProvider = new DefaultSchemaProvider();
+ * Schema schema = defaultSchemaProvider.schemaFor(TypeDescriptor.of(Transaction.class));
+ * PCollection transactions = pipeline.apply(Create.of(
+ * Row
+ * .withSchema(schema)
+ * .withFieldValue("bank", "A")
+ * .withFieldValue("purchaseAmount", 10.23)
+ * .withFieldValue("transactionId", "12345")
+ * .build(),
+ * Row
+ * .withSchema(schema)
+ * .withFieldValue("bank", "B")
+ * .withFieldValue("purchaseAmount", 54.65)
+ * .withFieldValue("transactionId", "54321")
+ * .build(),
+ * Row
+ * .withSchema(schema)
+ * .withFieldValue("bank", "C")
+ * .withFieldValue("purchaseAmount", 11.76)
+ * .withFieldValue("transactionId", "98765")
+ * .build()
+ * );
+ *
+ * transactions.apply(
+ * JsonIO
+ * .writeRowsTo("gs://bucket/path/to/folder/prefix")
+ * );
+ * }
+ *
+ * Writing the transactions {@link PCollection} of {@link Row}s would yield the following JSON
+ * file content.
+ *
+ *
{@code
+ * {"bank": "A", "purchaseAmount": 10.23, "transactionId": 12345}
+ * {"bank": "B", "purchaseAmount": 54.65, "transactionId": 54321}
+ * {"bank": "C", "purchaseAmount": 11,76, "transactionId": 98765}
+ * }
+ */
+public class JsonIO {
+ static final String DEFAULT_FILENAME_SUFFIX = ".json";
+
+ /** Instantiates a {@link Write} for writing user types in {@link JSONFormat} format. */
+ public static Write write(String to) {
+ return new AutoValue_JsonIO_Write.Builder()
+ .setTextIOWrite(createDefaultTextIOWrite(to))
+ .build();
+ }
+
+ /** Instantiates a {@link Write} for writing {@link Row}s in {@link JSONFormat} format. */
+ public static Write writeRows(String to) {
+ return new AutoValue_JsonIO_Write.Builder()
+ .setTextIOWrite(createDefaultTextIOWrite(to))
+ .build();
+ }
+
+ /** {@link PTransform} for writing JSON files. */
+ @AutoValue
+ public abstract static class Write
+ extends PTransform, WriteFilesResult> {
+
+ /** Specifies the {@link Compression} of all generated shard files. */
+ public Write withCompression(Compression compression) {
+ return toBuilder().setTextIOWrite(getTextIOWrite().withCompression(compression)).build();
+ }
+
+ /** Whether to skip the spilling of data. See {@link WriteFiles#withNoSpilling}. */
+ public Write withNoSpilling() {
+ return toBuilder().setTextIOWrite(getTextIOWrite().withNoSpilling()).build();
+ }
+
+ /**
+ * Specifies to use a given fixed number of shards per window. See {@link
+ * TextIO.Write#withNumShards}.
+ */
+ public Write withNumShards(Integer numShards) {
+ return toBuilder().setTextIOWrite(getTextIOWrite().withNumShards(numShards)).build();
+ }
+
+ /**
+ * Forces a single file as output and empty shard name template. See {@link
+ * TextIO.Write#withoutSharding}.
+ */
+ public Write withoutSharding() {
+ return toBuilder().setTextIOWrite(getTextIOWrite().withoutSharding()).build();
+ }
+
+ /**
+ * Uses the given {@link ShardNameTemplate} for naming output files. See {@link
+ * TextIO.Write#withShardNameTemplate}.
+ */
+ public Write withShardTemplate(String shardTemplate) {
+ return toBuilder()
+ .setTextIOWrite(getTextIOWrite().withShardNameTemplate(shardTemplate))
+ .build();
+ }
+
+ /** Configures the filename suffix for written files. See {@link TextIO.Write#withSuffix}. */
+ public Write withSuffix(String suffix) {
+ return toBuilder().setTextIOWrite(getTextIOWrite().withSuffix(suffix)).build();
+ }
+
+ /**
+ * Set the base directory used to generate temporary files. See {@link
+ * TextIO.Write#withTempDirectory}.
+ */
+ public Write withTempDirectory(ResourceId tempDirectory) {
+ return toBuilder().setTextIOWrite(getTextIOWrite().withTempDirectory(tempDirectory)).build();
+ }
+
+ /**
+ * Preserves windowing of input elements and writes them to files based on the element's window.
+ * See {@link TextIO.Write#withWindowedWrites}.
+ */
+ public Write withWindowedWrites() {
+ return toBuilder().setTextIOWrite(getTextIOWrite().withWindowedWrites()).build();
+ }
+
+ /**
+ * Returns a transform for writing to text files like this one but that has the given {@link
+ * FileBasedSink.WritableByteChannelFactory} to be used by the {@link FileBasedSink} during
+ * output. See {@link TextIO.Write#withWritableByteChannelFactory}.
+ */
+ public Write withWritableByteChannelFactory(
+ FileBasedSink.WritableByteChannelFactory writableByteChannelFactory) {
+ return toBuilder()
+ .setTextIOWrite(
+ getTextIOWrite().withWritableByteChannelFactory(writableByteChannelFactory))
+ .build();
+ }
+
+ /** The underlying {@link FileIO.Write} that writes converted input to JSON formatted output. */
+ abstract TextIO.Write getTextIOWrite();
+
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+
+ /**
+ * The underlying {@link FileIO.Write} that writes converted input to JSON formatted output.
+ */
+ abstract Builder setTextIOWrite(TextIO.Write value);
+
+ abstract Write autoBuild();
+
+ final Write build() {
+ return autoBuild();
+ }
+ }
+
+ @Override
+ public WriteFilesResult expand(PCollection input) {
+ if (!input.hasSchema()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "%s requires an input Schema. Note that only Row or user classes are supported. Consider using TextIO or FileIO directly when writing primitive types",
+ Write.class.getName()));
+ }
+
+ Schema schema = input.getSchema();
+
+ RowCoder rowCoder = RowCoder.of(schema);
+
+ PCollection rows =
+ input
+ .apply("To Rows", MapElements.into(rows()).via(input.getToRowFunction()))
+ .setCoder(rowCoder);
+
+ SerializableFunction toJsonFn =
+ JsonUtils.getRowToJsonStringsFunction(input.getSchema());
+
+ PCollection json = rows.apply("To JSON", MapElements.into(strings()).via(toJsonFn));
+
+ return json.apply("Write JSON", getTextIOWrite().withOutputFilenames());
+ }
+ }
+
+ private static TextIO.Write createDefaultTextIOWrite(String to) {
+ return TextIO.write().to(to).withSuffix(DEFAULT_FILENAME_SUFFIX);
+ }
+}
diff --git a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/package-info.java b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/package-info.java
new file mode 100644
index 000000000000..1ee191835713
--- /dev/null
+++ b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Transforms for reading and writing JSON files. */
+package org.apache.beam.sdk.io.json;
diff --git a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java
new file mode 100644
index 000000000000..9e030821e5ca
--- /dev/null
+++ b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/JsonWriteTransformProvider.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.json.providers;
+
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.WriteFilesResult;
+import org.apache.beam.sdk.io.json.JsonIO;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for {@link JsonIO#write}.
+ *
+ * Internal only: This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@AutoService(SchemaTransformProvider.class)
+public class JsonWriteTransformProvider
+ extends TypedSchemaTransformProvider {
+ private static final String INPUT_ROWS_TAG = "input";
+ private static final String WRITE_RESULTS = "output";
+
+ @Override
+ protected Class configurationClass() {
+ return JsonWriteConfiguration.class;
+ }
+
+ @Override
+ protected SchemaTransform from(JsonWriteConfiguration configuration) {
+ return new JsonWriteTransform(configuration);
+ }
+
+ @Override
+ public String identifier() {
+ return String.format("beam:schematransform:org.apache.beam:json_write:v1");
+ }
+
+ @Override
+ public List inputCollectionNames() {
+ return Collections.singletonList(INPUT_ROWS_TAG);
+ }
+
+ @Override
+ public List outputCollectionNames() {
+ return Collections.singletonList(WRITE_RESULTS);
+ }
+
+ /** Configuration for writing to BigQuery with Storage Write API. */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class JsonWriteConfiguration {
+
+ public void validate() {
+ checkArgument(
+ !Strings.isNullOrEmpty(this.getPath()), "Path for a JSON Write must be specified.");
+ }
+
+ public static Builder builder() {
+ return new AutoValue_JsonWriteTransformProvider_JsonWriteConfiguration.Builder();
+ }
+
+ @SchemaFieldDescription("The file path to write to.")
+ public abstract String getPath();
+
+ /** Builder for {@link JsonWriteConfiguration}. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setPath(String path);
+
+ /** Builds a {@link JsonWriteConfiguration} instance. */
+ public abstract JsonWriteConfiguration build();
+ }
+ }
+
+ /** A {@link SchemaTransform} for {@link JsonIO#write}. */
+ protected static class JsonWriteTransform extends SchemaTransform {
+
+ private final JsonWriteConfiguration configuration;
+
+ JsonWriteTransform(JsonWriteConfiguration configuration) {
+ configuration.validate();
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ WriteFilesResult> result =
+ input.get(INPUT_ROWS_TAG).apply(JsonIO.writeRows(configuration.getPath()).withSuffix(""));
+ Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING));
+ return PCollectionRowTuple.of(
+ WRITE_RESULTS,
+ result
+ .getPerDestinationOutputFilenames()
+ .apply(
+ "Collect filenames",
+ MapElements.into(TypeDescriptors.rows())
+ .via(
+ (destinationAndRow) ->
+ Row.withSchema(outputSchema)
+ .withFieldValue("filename", destinationAndRow.getValue())
+ .build()))
+ .setRowSchema(outputSchema));
+ }
+ }
+}
diff --git a/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/package-info.java b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/package-info.java
new file mode 100644
index 000000000000..312454f8733b
--- /dev/null
+++ b/sdks/java/io/json/src/main/java/org/apache/beam/sdk/io/json/providers/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Transforms for reading and writing JSON files. */
+package org.apache.beam.sdk.io.json.providers;
diff --git a/sdks/java/io/json/src/test/java/org/apache/beam/sdk/io/json/JsonIOWriteTest.java b/sdks/java/io/json/src/test/java/org/apache/beam/sdk/io/json/JsonIOWriteTest.java
new file mode 100644
index 000000000000..71fdcd6b3d94
--- /dev/null
+++ b/sdks/java/io/json/src/test/java/org/apache/beam/sdk/io/json/JsonIOWriteTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.json;
+
+import static org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.allPrimitiveDataTypes;
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.common.SchemaAwareJavaBeans.AllPrimitiveDataTypes;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SerializableMatcher;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link JsonIO.Write}. */
+@RunWith(JUnit4.class)
+public class JsonIOWriteTest {
+ @Rule public TestPipeline writePipeline = TestPipeline.create();
+
+ @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+ @Rule
+ public TestPipeline errorPipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+ @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ public void writesUserDefinedTypes() {
+ File folder =
+ createFolder(AllPrimitiveDataTypes.class.getSimpleName(), "writesUserDefinedTypes");
+
+ PCollection input =
+ writePipeline.apply(
+ Create.of(
+ allPrimitiveDataTypes(false, BigDecimal.TEN, 1.0, 1.0f, 1, 1L, "a"),
+ allPrimitiveDataTypes(
+ false, BigDecimal.TEN.add(BigDecimal.TEN), 2.0, 2.0f, 2, 2L, "b"),
+ allPrimitiveDataTypes(
+ false,
+ BigDecimal.TEN.add(BigDecimal.TEN).add(BigDecimal.TEN),
+ 3.0,
+ 3.0f,
+ 3,
+ 3L,
+ "c")));
+
+ input.apply(JsonIO.write(toFilenamePrefix(folder)).withNumShards(1));
+
+ writePipeline.run().waitUntilFinish();
+
+ PAssert.that(readPipeline.apply(TextIO.read().from(toFilenamePrefix(folder) + "*")))
+ .containsInAnyOrder(
+ containsAll(
+ "\"aDouble\":1.0",
+ "\"aFloat\":1.0",
+ "\"aLong\":1",
+ "\"aString\":\"a\"",
+ "\"anInteger\":1",
+ "\"aDecimal\":10",
+ "\"aBoolean\":false"),
+ containsAll(
+ "\"aDouble\":2.0",
+ "\"aFloat\":2.0",
+ "\"aLong\":2",
+ "\"aString\":\"b\"",
+ "\"anInteger\":2",
+ "\"aDecimal\":20",
+ "\"aBoolean\":false"),
+ containsAll(
+ "\"aDouble\":3.0",
+ "\"aFloat\":3.0",
+ "\"aLong\":3",
+ "\"aString\":\"c\"",
+ "\"anInteger\":3",
+ "\"aDecimal\":30",
+ "\"aBoolean\":false"));
+
+ readPipeline.run();
+ }
+
+ private static SerializableMatcher containsAll(String... needles) {
+ class Matcher extends BaseMatcher implements SerializableMatcher {
+ @Override
+ public boolean matches(Object item) {
+ if (!(item instanceof String)) {
+ return false;
+ }
+
+ String haystack = (String) item;
+ for (String needle : needles) {
+ if (!haystack.contains(needle)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("Contains all of: ");
+ description.appendValueList("[", ",", "]", needles);
+ }
+ }
+ return new Matcher();
+ }
+
+ private static String toFilenamePrefix(File folder) {
+ checkArgument(folder.isDirectory());
+ return folder.getAbsolutePath() + "/out";
+ }
+
+ private File createFolder(String... paths) {
+ try {
+ return tempFolder.newFolder(paths);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+}
diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml
index 8a9e0c100496..c4748483b04b 100644
--- a/sdks/python/apache_beam/yaml/standard_io.yaml
+++ b/sdks/python/apache_beam/yaml/standard_io.yaml
@@ -105,3 +105,10 @@
'WriteToParquet': 'apache_beam.io.WriteToParquet'
'ReadFromAvro': 'apache_beam.io.ReadFromAvro'
'WriteToAvro': 'apache_beam.io.WriteToAvro'
+
+- type: beamJar
+ transforms:
+ 'WriteToCsv': 'beam:schematransform:org.apache.beam:csv_write:v1'
+ 'WriteToJson': 'beam:schematransform:org.apache.beam:json_write:v1'
+ config:
+ gradle_target: 'sdks:java:extensions:schemaio-expansion-service:shadowJar'
diff --git a/settings.gradle.kts b/settings.gradle.kts
index f4901d7df92b..c370c5da27d1 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -244,6 +244,7 @@ include(":sdks:java:io:hbase")
include(":sdks:java:io:hcatalog")
include(":sdks:java:io:jdbc")
include(":sdks:java:io:jms")
+include(":sdks:java:io:json")
include(":sdks:java:io:kafka")
include(":sdks:java:io:kinesis")
include(":sdks:java:io:kinesis:expansion-service")