diff --git a/build.gradle b/build.gradle
index 5c91d3b508..f66f3dc852 100644
--- a/build.gradle
+++ b/build.gradle
@@ -122,6 +122,8 @@ ext.libraries = [
restliCommon: "com.linkedin.pegasus:restli-common:${pegasusVersion}",
rocksdbjni: 'org.rocksdb:rocksdbjni:8.8.1',
samzaApi: 'org.apache.samza:samza-api:1.5.1',
+ beamSdk: 'org.apache.beam:beam-sdks-java-core:2.60.0',
+ beamExtensionAvro: 'org.apache.beam:beam-sdks-java-extensions-avro:2.60.0',
slf4j: 'org.slf4j:slf4j:1.7.36',
slf4jApi: 'org.slf4j:slf4j-api:1.7.36',
slf4jSimple: 'org.slf4j:slf4j-simple:1.7.36',
diff --git a/docs/dev_guide/navigating_project.md b/docs/dev_guide/navigating_project.md
index ba6eaeb239..31ac28cc75 100644
--- a/docs/dev_guide/navigating_project.md
+++ b/docs/dev_guide/navigating_project.md
@@ -22,6 +22,7 @@ The Venice codebase is split across these directories:
minimal Venice-specific logic, and be mostly just glue code to satisfy the contracts expected by the third-party
system. Also, these modules are intended to minimize the dependency burden of the other client libraries. Those
include:
+ - `venice-beam`, which implements the Beam Read API, enabling a Beam job to consume the Venice changelog.
- `venice-pulsar`, which contains an implementation of a Pulsar [Sink](https://pulsar.apache.org/docs/next/io-overview/#sink),
in order to feed data from Pulsar topics to Venice.
- `venice-samza`, which contains an implementation of a Samza [SystemProducer](https://samza.apache.org/learn/documentation/latest/api/javadocs/org/apache/samza/system/SystemProducer.html),
diff --git a/gradle/spotbugs/exclude.xml b/gradle/spotbugs/exclude.xml
index 3725224437..d97431bb02 100644
--- a/gradle/spotbugs/exclude.xml
+++ b/gradle/spotbugs/exclude.xml
@@ -478,4 +478,14 @@
+
+
+
+
+
+
+
+
+
+
diff --git a/integrations/venice-beam/build.gradle b/integrations/venice-beam/build.gradle
new file mode 100644
index 0000000000..ad3ea0ecac
--- /dev/null
+++ b/integrations/venice-beam/build.gradle
@@ -0,0 +1,20 @@
+dependencies {
+ implementation(project(':internal:venice-common')) {
+ exclude module: 'kafka_2.10'
+ exclude group: 'org.scala-lang'
+ }
+ implementation project(':clients:da-vinci-client')
+ implementation project(':clients:venice-thin-client')
+ implementation project(':clients:venice-client')
+
+ implementation libraries.log4j2api
+ implementation libraries.log4j2core
+
+ implementation libraries.beamSdk
+ implementation libraries.beamExtensionAvro
+}
+
+ext {
+ // to be tested in integration test
+ jacocoCoverageThreshold = 0.00
+}
diff --git a/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/CheckPointProperties.java b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/CheckPointProperties.java
new file mode 100644
index 0000000000..149c0ab430
--- /dev/null
+++ b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/CheckPointProperties.java
@@ -0,0 +1,34 @@
+package com.linkedin.venice.beam.consumer;
+
+import com.linkedin.davinci.consumer.VeniceChangeCoordinate;
+import java.util.Objects;
+import java.util.Set;
+
+
+/**
+ * Properties used by {@link com.linkedin.davinci.consumer.VeniceChangelogConsumer} to seek
+ * checkpoints.
+ */
+public class CheckPointProperties {
+ private Set coordinates;
+ private long seekTimestamp;
+ private String store;
+
+ public CheckPointProperties(Set coordinates, long seekTimestamp, String store) {
+ this.coordinates = coordinates;
+ this.seekTimestamp = seekTimestamp;
+ this.store = Objects.requireNonNull(store);
+ }
+
+ public Set getCoordinates() {
+ return coordinates;
+ }
+
+ public long getSeekTimestamp() {
+ return seekTimestamp;
+ }
+
+ public String getStore() {
+ return store;
+ }
+}
diff --git a/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/LocalVeniceChangelogConsumerProvider.java b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/LocalVeniceChangelogConsumerProvider.java
new file mode 100644
index 0000000000..bd528a0672
--- /dev/null
+++ b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/LocalVeniceChangelogConsumerProvider.java
@@ -0,0 +1,34 @@
+package com.linkedin.venice.beam.consumer;
+
+import com.linkedin.davinci.consumer.VeniceChangelogConsumer;
+import com.linkedin.davinci.consumer.VeniceChangelogConsumerClientFactory;
+import java.lang.reflect.InvocationTargetException;
+
+
+/** Provides a configured {@link VeniceChangelogConsumer} instance. */
+public class LocalVeniceChangelogConsumerProvider implements VeniceChangelogConsumerProvider {
+ private static final long serialVersionUID = 1L;
+
+ private final Class extends VeniceChangelogConsumerClientFactory> _veniceChangelogConsumerClientFactoryClass;
+
+ public LocalVeniceChangelogConsumerProvider(
+ Class extends VeniceChangelogConsumerClientFactory> veniceChangelogConsumerClientFactoryClass) {
+ _veniceChangelogConsumerClientFactoryClass = veniceChangelogConsumerClientFactoryClass;
+ }
+
+ @Override
+ public VeniceChangelogConsumer getVeniceChangelogConsumer(String storeName)
+ throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
+ VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory =
+ this._veniceChangelogConsumerClientFactoryClass.getDeclaredConstructor().newInstance();
+ return veniceChangelogConsumerClientFactory.getChangelogConsumer(storeName);
+ }
+
+ @Override
+ public VeniceChangelogConsumer getVeniceChangelogConsumer(String storeName, String consumerId)
+ throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
+ VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory =
+ this._veniceChangelogConsumerClientFactoryClass.getDeclaredConstructor().newInstance();
+ return veniceChangelogConsumerClientFactory.getChangelogConsumer(storeName, consumerId);
+ }
+}
diff --git a/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/PubSubMessageCoder.java b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/PubSubMessageCoder.java
new file mode 100644
index 0000000000..9565302fa2
--- /dev/null
+++ b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/PubSubMessageCoder.java
@@ -0,0 +1,51 @@
+package com.linkedin.venice.beam.consumer;
+
+import com.linkedin.davinci.consumer.ChangeEvent;
+import com.linkedin.davinci.consumer.VeniceChangeCoordinate;
+import com.linkedin.venice.pubsub.api.PubSubMessage;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+
+/** Uses {@link AvroCoder} to encode/decode {@link PubSubMessage}s. */
+public class PubSubMessageCoder
+ extends StructuredCoder, VeniceChangeCoordinate>> {
+ private static final long serialVersionUID = 1L;
+
+ private final AvroCoder, VeniceChangeCoordinate>> pubSubMessageAvroCoder =
+ AvroCoder.of(new TypeDescriptor, VeniceChangeCoordinate>>() {
+ });
+
+ public static PubSubMessageCoder of() {
+ return new PubSubMessageCoder<>();
+ }
+
+ @Override
+ public void encode(PubSubMessage, VeniceChangeCoordinate> value, @Nonnull OutputStream outStream)
+ throws IOException {
+ pubSubMessageAvroCoder.encode(value, outStream);
+ }
+
+ @Override
+ public PubSubMessage, VeniceChangeCoordinate> decode(@Nonnull InputStream inStream)
+ throws IOException {
+ return pubSubMessageAvroCoder.decode(inStream);
+ }
+
+ @Override
+ public @Nonnull List extends Coder>> getCoderArguments() {
+ return Collections.singletonList(pubSubMessageAvroCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() {
+ }
+}
diff --git a/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceChangelogConsumerIO.java b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceChangelogConsumerIO.java
new file mode 100644
index 0000000000..62666d6eff
--- /dev/null
+++ b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceChangelogConsumerIO.java
@@ -0,0 +1,609 @@
+package com.linkedin.venice.beam.consumer;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.linkedin.davinci.consumer.ChangeEvent;
+import com.linkedin.davinci.consumer.VeniceChangeCoordinate;
+import com.linkedin.davinci.consumer.VeniceChangelogConsumer;
+import com.linkedin.davinci.consumer.VeniceCoordinateOutOfRangeException;
+import com.linkedin.venice.pubsub.api.PubSubMessage;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.Read.Unbounded;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+
+/**
+ * Beam Connector for Venice Change data capture IO. Uses {@link VeniceChangelogConsumer} underneath
+ * to pull messages from configured venice store.
+ */
+public final class VeniceChangelogConsumerIO {
+ private static final Logger LOG = LogManager.getLogger(VeniceChangelogConsumerIO.class);
+
+ private VeniceChangelogConsumerIO() {
+ }
+
+ public static Read read() {
+ return new Read<>();
+ }
+
+ public static class Read
+ extends PTransform, VeniceChangeCoordinate>>> {
+ private static final long serialVersionUID = 1L;
+
+ public enum SeekWhence
+ implements BiFunction> {
+ CHECKPOINT {
+ @Override
+ public CompletableFuture apply(
+ VeniceChangelogConsumer consumer,
+ CheckPointProperties checkPointProperties) {
+ Set coordinates = checkPointProperties.getCoordinates();
+ LOG.info("Seeking To Coordinates {} for store {}", coordinates, checkPointProperties.getStore());
+ return consumer.seekToCheckpoint(coordinates);
+ }
+ },
+ END_OF_PUSH {
+ @Override
+ public CompletableFuture apply(
+ VeniceChangelogConsumer consumer,
+ CheckPointProperties checkPointProperties) {
+ LOG.info("Seeking To EndOfPush for store {}", checkPointProperties.getStore());
+ return consumer.seekToEndOfPush();
+ }
+ },
+ START_OF_PUSH {
+ @Override
+ public CompletableFuture apply(
+ VeniceChangelogConsumer consumer,
+ CheckPointProperties checkPointProperties) {
+ LOG.info("Seeking To BeginningOfPush for store {}", checkPointProperties.getStore());
+ return consumer.seekToBeginningOfPush();
+ }
+ },
+ TAIL {
+ @Override
+ public CompletableFuture apply(
+ VeniceChangelogConsumer consumer,
+ CheckPointProperties checkPointProperties) {
+ LOG.info("Seeking To Tail for store {}", checkPointProperties.getStore());
+ return consumer.seekToTail();
+ }
+ },
+ TIMESTAMP {
+ // Seeks to given epoch timestamp in milliseconds if positive or rewinds by given
+ // milliseconds from current time if negative.
+ @Override
+ public CompletableFuture apply(
+ VeniceChangelogConsumer consumer,
+ CheckPointProperties checkPointProperties) {
+ long seekTimestamp;
+ if (checkPointProperties.getSeekTimestamp() < 0) {
+ seekTimestamp = System.currentTimeMillis() + checkPointProperties.getSeekTimestamp();
+ } else {
+ seekTimestamp = checkPointProperties.getSeekTimestamp();
+ }
+ LOG.info("Seeking To Timestamp {} for store {}", seekTimestamp, checkPointProperties.getStore());
+ return consumer.seekToTimestamp(seekTimestamp);
+ }
+ };
+ }
+
+ private Set partitions = Collections.emptySet();
+ private Duration pollTimeout = Duration.standardSeconds(1);
+ // Positive timestamp is treated as epoch time in milliseconds, negative timestamp is treated as
+ // rewind time in milliseconds (from current time)
+ private long seekTimestamp;
+ private SeekWhence seekWhence = SeekWhence.CHECKPOINT;
+
+ private String store = "";
+ private String consumerIdSuffix = "";
+ private Duration terminationTimeout = Duration.standardSeconds(30);
+
+ private LocalVeniceChangelogConsumerProvider localVeniceChangelogConsumerProvider;
+
+ public Read() {
+ }
+
+ public Read(Read read) {
+ this.consumerIdSuffix = read.consumerIdSuffix;
+ this.partitions = read.partitions;
+ this.pollTimeout = read.pollTimeout;
+ this.seekWhence = read.seekWhence;
+ this.store = read.store;
+ this.terminationTimeout = read.terminationTimeout;
+ this.seekTimestamp = read.seekTimestamp;
+ this.localVeniceChangelogConsumerProvider = read.localVeniceChangelogConsumerProvider;
+ }
+
+ @Override
+ public PCollection, VeniceChangeCoordinate>> expand(PBegin input) {
+ Source source = new Source<>(this);
+ Unbounded, VeniceChangeCoordinate>> unbounded =
+ org.apache.beam.sdk.io.Read.from(source);
+ return input.getPipeline().apply(unbounded);
+ }
+
+ public Duration getPollTimeout() {
+ return this.pollTimeout;
+ }
+
+ public Read setPollTimeout(Duration timeout) {
+ this.pollTimeout = timeout;
+ return this;
+ }
+
+ public SeekWhence getSeekWhence() {
+ return this.seekWhence;
+ }
+
+ public Read setSeekWhence(SeekWhence seekWhence) {
+ this.seekWhence = seekWhence;
+ return this;
+ }
+
+ public String getStore() {
+ return this.store;
+ }
+
+ public Read setStore(String store) {
+ this.store = store;
+ return this;
+ }
+
+ public Duration getTerminationTimeout() {
+ return this.terminationTimeout;
+ }
+
+ public Read setTerminationTimeout(Duration timeout) {
+ this.terminationTimeout = timeout;
+ return this;
+ }
+
+ public long getSeekTimestamp() {
+ return seekTimestamp;
+ }
+
+ public Read setSeekTimestamp(long seekTimestamp) {
+ this.seekTimestamp = seekTimestamp;
+ return this;
+ }
+
+ public Set getPartitions() {
+ return partitions;
+ }
+
+ public Read setPartitions(Set partitions) {
+ this.partitions = partitions;
+ return this;
+ }
+
+ public LocalVeniceChangelogConsumerProvider getLocalVeniceChangelogConsumerProvider() {
+ return localVeniceChangelogConsumerProvider;
+ }
+
+ public Read setLocalVeniceChangelogConsumerProvider(
+ LocalVeniceChangelogConsumerProvider localVeniceChangelogConsumerProvider) {
+ this.localVeniceChangelogConsumerProvider = localVeniceChangelogConsumerProvider;
+ return this;
+ }
+
+ public String getConsumerIdSuffix() {
+ return consumerIdSuffix;
+ }
+
+ public Read setConsumerIdSuffix(String consumerIdSuffix) {
+ this.consumerIdSuffix = consumerIdSuffix;
+ return this;
+ }
+
+ public RemoveMetadata withoutMetadata() {
+ return new RemoveMetadata();
+ }
+
+ public CurrentValueTransform withOnlyCurrentValue(Coder returnTypeCoder) {
+ return new CurrentValueTransform(returnTypeCoder);
+ }
+
+ class RemoveMetadata extends PTransform>>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public @Nonnull PCollection>> expand(PBegin pBegin) {
+ PCollection, VeniceChangeCoordinate>> input = pBegin.apply(Read.this);
+ return input.apply(
+ MapElements.via(
+ new SimpleFunction, VeniceChangeCoordinate>, KV>>() {
+ @Override
+ public KV> apply(PubSubMessage, VeniceChangeCoordinate> message) {
+ return KV.of(message.getKey(), message.getValue());
+ }
+ }))
+ .setCoder(VeniceMessageCoder.of());
+ }
+ }
+
+ private class CurrentValueTransform extends PTransform>> {
+ private static final long serialVersionUID = 1L;
+ private final Coder> _returnTypeCoder;
+
+ CurrentValueTransform(Coder> returnTypeCoder) {
+ _returnTypeCoder = Objects.requireNonNull(returnTypeCoder);
+ }
+
+ @Override
+ public @Nonnull PCollection> expand(PBegin pBegin) {
+ PCollection>> pCollection = pBegin.apply(new RemoveMetadata());
+ return pCollection.apply(MapElements.via(new SimpleFunction>, KV>() {
+ @Override
+ public KV apply(KV> message) {
+ GenericData.Record value = (GenericData.Record) message.getValue().getCurrentValue();
+ return KV.of(message.getKey(), value);
+ }
+ })).setCoder(_returnTypeCoder);
+ }
+ }
+ }
+
+ private static class Source
+ extends UnboundedSource, VeniceChangeCoordinate>, VeniceCheckpointMark> {
+ private static final long serialVersionUID = 1L;
+ private final Read read;
+
+ Source(Read read) {
+ this.read = read;
+ }
+
+ @Override
+ public @Nonnull List extends Source> split(int desiredNumSplits, @Nonnull PipelineOptions options)
+ throws Exception {
+ Set partitions = this.read.getPartitions();
+ if (partitions.isEmpty()) {
+ partitions =
+ IntStream
+ .range(
+ 0,
+ this.read.localVeniceChangelogConsumerProvider.getVeniceChangelogConsumer(this.read.getStore())
+ .getPartitionCount())
+ .boxed()
+ .collect(Collectors.toSet());
+ LOG.info("Detected store {} has {} partitions", this.read.getStore(), partitions.size());
+ }
+
+ // NOTE: Enforces all splits have the same # of partitions.
+ int numSplits = Math.min(desiredNumSplits, partitions.size());
+ while (partitions.size() % numSplits > 0) {
+ ++numSplits;
+ }
+
+ List> partitionSplits = new ArrayList<>(numSplits);
+ for (int i = 0; i < numSplits; ++i) {
+ partitionSplits.add(new HashSet<>());
+ }
+ for (int partition: partitions) {
+ partitionSplits.get(partition % numSplits).add(partition);
+ }
+
+ List