Skip to content

Commit

Permalink
[beam] VeniceChangelogConsumerIO for Apache Beam integration (#1300)
Browse files Browse the repository at this point in the history
Introduced new VeniceChangelogConsumerIO and related classes so that
Apache Beam stream processing jobs can consume the Venice changelog.

This code is added in a new integrations:venice-beam module.
  • Loading branch information
suketkar authored Nov 22, 2024
1 parent 0040d45 commit 7f553be
Show file tree
Hide file tree
Showing 12 changed files with 913 additions and 0 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ ext.libraries = [
restliCommon: "com.linkedin.pegasus:restli-common:${pegasusVersion}",
rocksdbjni: 'org.rocksdb:rocksdbjni:8.8.1',
samzaApi: 'org.apache.samza:samza-api:1.5.1',
beamSdk: 'org.apache.beam:beam-sdks-java-core:2.60.0',
beamExtensionAvro: 'org.apache.beam:beam-sdks-java-extensions-avro:2.60.0',
slf4j: 'org.slf4j:slf4j:1.7.36',
slf4jApi: 'org.slf4j:slf4j-api:1.7.36',
slf4jSimple: 'org.slf4j:slf4j-simple:1.7.36',
Expand Down
1 change: 1 addition & 0 deletions docs/dev_guide/navigating_project.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The Venice codebase is split across these directories:
minimal Venice-specific logic, and be mostly just glue code to satisfy the contracts expected by the third-party
system. Also, these modules are intended to minimize the dependency burden of the other client libraries. Those
include:
- `venice-beam`, which implements the Beam Read API, enabling a Beam job to consume the Venice changelog.
- `venice-pulsar`, which contains an implementation of a Pulsar [Sink](https://pulsar.apache.org/docs/next/io-overview/#sink),
in order to feed data from Pulsar topics to Venice.
- `venice-samza`, which contains an implementation of a Samza [SystemProducer](https://samza.apache.org/learn/documentation/latest/api/javadocs/org/apache/samza/system/SystemProducer.html),
Expand Down
10 changes: 10 additions & 0 deletions gradle/spotbugs/exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -478,4 +478,14 @@
<Package name="com.linkedin.venice.protocols"/>
<Bug pattern="MS_EXPOSE_REP"/>
</Match>
<Match>
<Bug pattern="SE_INNER_CLASS"/>
<Or>
<Class name="com.linkedin.venice.beam.consumer.PubSubMessageCoder$1"/>
<Class name="com.linkedin.venice.beam.consumer.VeniceMessageCoder$1"/>
<Class name="com.linkedin.venice.beam.consumer.VeniceMessageCoder$2"/>
<Class name="com.linkedin.venice.beam.consumer.VeniceChangelogConsumerIO$Read$CurrentValueTransform"/>
<Class name="com.linkedin.venice.beam.consumer.VeniceChangelogConsumerIO$Read$RemoveMetadata"/>
</Or>
</Match>
</FindBugsFilter>
20 changes: 20 additions & 0 deletions integrations/venice-beam/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
dependencies {
implementation(project(':internal:venice-common')) {
exclude module: 'kafka_2.10'
exclude group: 'org.scala-lang'
}
implementation project(':clients:da-vinci-client')
implementation project(':clients:venice-thin-client')
implementation project(':clients:venice-client')

implementation libraries.log4j2api
implementation libraries.log4j2core

implementation libraries.beamSdk
implementation libraries.beamExtensionAvro
}

ext {
// to be tested in integration test
jacocoCoverageThreshold = 0.00
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.linkedin.venice.beam.consumer;

import com.linkedin.davinci.consumer.VeniceChangeCoordinate;
import java.util.Objects;
import java.util.Set;


/**
* Properties used by {@link com.linkedin.davinci.consumer.VeniceChangelogConsumer} to seek
* checkpoints.
*/
public class CheckPointProperties {
private Set<VeniceChangeCoordinate> coordinates;
private long seekTimestamp;
private String store;

public CheckPointProperties(Set<VeniceChangeCoordinate> coordinates, long seekTimestamp, String store) {
this.coordinates = coordinates;
this.seekTimestamp = seekTimestamp;
this.store = Objects.requireNonNull(store);
}

public Set<VeniceChangeCoordinate> getCoordinates() {
return coordinates;
}

public long getSeekTimestamp() {
return seekTimestamp;
}

public String getStore() {
return store;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.linkedin.venice.beam.consumer;

import com.linkedin.davinci.consumer.VeniceChangelogConsumer;
import com.linkedin.davinci.consumer.VeniceChangelogConsumerClientFactory;
import java.lang.reflect.InvocationTargetException;


/** Provides a configured {@link VeniceChangelogConsumer} instance. */
public class LocalVeniceChangelogConsumerProvider<K, V> implements VeniceChangelogConsumerProvider<K, V> {
private static final long serialVersionUID = 1L;

private final Class<? extends VeniceChangelogConsumerClientFactory> _veniceChangelogConsumerClientFactoryClass;

public LocalVeniceChangelogConsumerProvider(
Class<? extends VeniceChangelogConsumerClientFactory> veniceChangelogConsumerClientFactoryClass) {
_veniceChangelogConsumerClientFactoryClass = veniceChangelogConsumerClientFactoryClass;
}

@Override
public VeniceChangelogConsumer<K, V> getVeniceChangelogConsumer(String storeName)
throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory =
this._veniceChangelogConsumerClientFactoryClass.getDeclaredConstructor().newInstance();
return veniceChangelogConsumerClientFactory.getChangelogConsumer(storeName);
}

@Override
public VeniceChangelogConsumer<K, V> getVeniceChangelogConsumer(String storeName, String consumerId)
throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory =
this._veniceChangelogConsumerClientFactoryClass.getDeclaredConstructor().newInstance();
return veniceChangelogConsumerClientFactory.getChangelogConsumer(storeName, consumerId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.linkedin.venice.beam.consumer;

import com.linkedin.davinci.consumer.ChangeEvent;
import com.linkedin.davinci.consumer.VeniceChangeCoordinate;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.values.TypeDescriptor;


/** Uses {@link AvroCoder} to encode/decode {@link PubSubMessage}s. */
public class PubSubMessageCoder<K, V>
extends StructuredCoder<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> {
private static final long serialVersionUID = 1L;

private final AvroCoder<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> pubSubMessageAvroCoder =
AvroCoder.of(new TypeDescriptor<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>>() {
});

public static <K, V> PubSubMessageCoder<K, V> of() {
return new PubSubMessageCoder<>();
}

@Override
public void encode(PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate> value, @Nonnull OutputStream outStream)
throws IOException {
pubSubMessageAvroCoder.encode(value, outStream);
}

@Override
public PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate> decode(@Nonnull InputStream inStream)
throws IOException {
return pubSubMessageAvroCoder.decode(inStream);
}

@Override
public @Nonnull List<? extends Coder<?>> getCoderArguments() {
return Collections.singletonList(pubSubMessageAvroCoder);
}

@Override
public void verifyDeterministic() {
}
}
Loading

0 comments on commit 7f553be

Please sign in to comment.