From 7f553beda91ecb22908948abf5bb7b297bfaa4b9 Mon Sep 17 00:00:00 2001 From: suketkar Date: Sat, 23 Nov 2024 01:07:13 +0530 Subject: [PATCH] [beam] VeniceChangelogConsumerIO for Apache Beam integration (#1300) Introduced new VeniceChangelogConsumerIO and related classes so that Apache Beam stream processing jobs can consume the Venice changelog. This code is added in a new integrations:venice-beam module. --- build.gradle | 2 + docs/dev_guide/navigating_project.md | 1 + gradle/spotbugs/exclude.xml | 10 + integrations/venice-beam/build.gradle | 20 + .../beam/consumer/CheckPointProperties.java | 34 + .../LocalVeniceChangelogConsumerProvider.java | 34 + .../beam/consumer/PubSubMessageCoder.java | 51 ++ .../consumer/VeniceChangelogConsumerIO.java | 609 ++++++++++++++++++ .../VeniceChangelogConsumerProvider.java | 15 + .../beam/consumer/VeniceCheckpointMark.java | 73 +++ .../beam/consumer/VeniceMessageCoder.java | 63 ++ settings.gradle | 1 + 12 files changed, 913 insertions(+) create mode 100644 integrations/venice-beam/build.gradle create mode 100644 integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/CheckPointProperties.java create mode 100644 integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/LocalVeniceChangelogConsumerProvider.java create mode 100644 integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/PubSubMessageCoder.java create mode 100644 integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceChangelogConsumerIO.java create mode 100644 integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceChangelogConsumerProvider.java create mode 100644 integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceCheckpointMark.java create mode 100644 integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceMessageCoder.java diff --git a/build.gradle b/build.gradle index 5c91d3b508..f66f3dc852 100644 --- a/build.gradle +++ b/build.gradle @@ -122,6 +122,8 @@ ext.libraries = [ restliCommon: "com.linkedin.pegasus:restli-common:${pegasusVersion}", rocksdbjni: 'org.rocksdb:rocksdbjni:8.8.1', samzaApi: 'org.apache.samza:samza-api:1.5.1', + beamSdk: 'org.apache.beam:beam-sdks-java-core:2.60.0', + beamExtensionAvro: 'org.apache.beam:beam-sdks-java-extensions-avro:2.60.0', slf4j: 'org.slf4j:slf4j:1.7.36', slf4jApi: 'org.slf4j:slf4j-api:1.7.36', slf4jSimple: 'org.slf4j:slf4j-simple:1.7.36', diff --git a/docs/dev_guide/navigating_project.md b/docs/dev_guide/navigating_project.md index ba6eaeb239..31ac28cc75 100644 --- a/docs/dev_guide/navigating_project.md +++ b/docs/dev_guide/navigating_project.md @@ -22,6 +22,7 @@ The Venice codebase is split across these directories: minimal Venice-specific logic, and be mostly just glue code to satisfy the contracts expected by the third-party system. Also, these modules are intended to minimize the dependency burden of the other client libraries. Those include: + - `venice-beam`, which implements the Beam Read API, enabling a Beam job to consume the Venice changelog. - `venice-pulsar`, which contains an implementation of a Pulsar [Sink](https://pulsar.apache.org/docs/next/io-overview/#sink), in order to feed data from Pulsar topics to Venice. - `venice-samza`, which contains an implementation of a Samza [SystemProducer](https://samza.apache.org/learn/documentation/latest/api/javadocs/org/apache/samza/system/SystemProducer.html), diff --git a/gradle/spotbugs/exclude.xml b/gradle/spotbugs/exclude.xml index 3725224437..d97431bb02 100644 --- a/gradle/spotbugs/exclude.xml +++ b/gradle/spotbugs/exclude.xml @@ -478,4 +478,14 @@ + + + + + + + + + + diff --git a/integrations/venice-beam/build.gradle b/integrations/venice-beam/build.gradle new file mode 100644 index 0000000000..ad3ea0ecac --- /dev/null +++ b/integrations/venice-beam/build.gradle @@ -0,0 +1,20 @@ +dependencies { + implementation(project(':internal:venice-common')) { + exclude module: 'kafka_2.10' + exclude group: 'org.scala-lang' + } + implementation project(':clients:da-vinci-client') + implementation project(':clients:venice-thin-client') + implementation project(':clients:venice-client') + + implementation libraries.log4j2api + implementation libraries.log4j2core + + implementation libraries.beamSdk + implementation libraries.beamExtensionAvro +} + +ext { + // to be tested in integration test + jacocoCoverageThreshold = 0.00 +} diff --git a/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/CheckPointProperties.java b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/CheckPointProperties.java new file mode 100644 index 0000000000..149c0ab430 --- /dev/null +++ b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/CheckPointProperties.java @@ -0,0 +1,34 @@ +package com.linkedin.venice.beam.consumer; + +import com.linkedin.davinci.consumer.VeniceChangeCoordinate; +import java.util.Objects; +import java.util.Set; + + +/** + * Properties used by {@link com.linkedin.davinci.consumer.VeniceChangelogConsumer} to seek + * checkpoints. + */ +public class CheckPointProperties { + private Set coordinates; + private long seekTimestamp; + private String store; + + public CheckPointProperties(Set coordinates, long seekTimestamp, String store) { + this.coordinates = coordinates; + this.seekTimestamp = seekTimestamp; + this.store = Objects.requireNonNull(store); + } + + public Set getCoordinates() { + return coordinates; + } + + public long getSeekTimestamp() { + return seekTimestamp; + } + + public String getStore() { + return store; + } +} diff --git a/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/LocalVeniceChangelogConsumerProvider.java b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/LocalVeniceChangelogConsumerProvider.java new file mode 100644 index 0000000000..bd528a0672 --- /dev/null +++ b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/LocalVeniceChangelogConsumerProvider.java @@ -0,0 +1,34 @@ +package com.linkedin.venice.beam.consumer; + +import com.linkedin.davinci.consumer.VeniceChangelogConsumer; +import com.linkedin.davinci.consumer.VeniceChangelogConsumerClientFactory; +import java.lang.reflect.InvocationTargetException; + + +/** Provides a configured {@link VeniceChangelogConsumer} instance. */ +public class LocalVeniceChangelogConsumerProvider implements VeniceChangelogConsumerProvider { + private static final long serialVersionUID = 1L; + + private final Class _veniceChangelogConsumerClientFactoryClass; + + public LocalVeniceChangelogConsumerProvider( + Class veniceChangelogConsumerClientFactoryClass) { + _veniceChangelogConsumerClientFactoryClass = veniceChangelogConsumerClientFactoryClass; + } + + @Override + public VeniceChangelogConsumer getVeniceChangelogConsumer(String storeName) + throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { + VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory = + this._veniceChangelogConsumerClientFactoryClass.getDeclaredConstructor().newInstance(); + return veniceChangelogConsumerClientFactory.getChangelogConsumer(storeName); + } + + @Override + public VeniceChangelogConsumer getVeniceChangelogConsumer(String storeName, String consumerId) + throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { + VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory = + this._veniceChangelogConsumerClientFactoryClass.getDeclaredConstructor().newInstance(); + return veniceChangelogConsumerClientFactory.getChangelogConsumer(storeName, consumerId); + } +} diff --git a/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/PubSubMessageCoder.java b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/PubSubMessageCoder.java new file mode 100644 index 0000000000..9565302fa2 --- /dev/null +++ b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/PubSubMessageCoder.java @@ -0,0 +1,51 @@ +package com.linkedin.venice.beam.consumer; + +import com.linkedin.davinci.consumer.ChangeEvent; +import com.linkedin.davinci.consumer.VeniceChangeCoordinate; +import com.linkedin.venice.pubsub.api.PubSubMessage; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nonnull; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.sdk.values.TypeDescriptor; + + +/** Uses {@link AvroCoder} to encode/decode {@link PubSubMessage}s. */ +public class PubSubMessageCoder + extends StructuredCoder, VeniceChangeCoordinate>> { + private static final long serialVersionUID = 1L; + + private final AvroCoder, VeniceChangeCoordinate>> pubSubMessageAvroCoder = + AvroCoder.of(new TypeDescriptor, VeniceChangeCoordinate>>() { + }); + + public static PubSubMessageCoder of() { + return new PubSubMessageCoder<>(); + } + + @Override + public void encode(PubSubMessage, VeniceChangeCoordinate> value, @Nonnull OutputStream outStream) + throws IOException { + pubSubMessageAvroCoder.encode(value, outStream); + } + + @Override + public PubSubMessage, VeniceChangeCoordinate> decode(@Nonnull InputStream inStream) + throws IOException { + return pubSubMessageAvroCoder.decode(inStream); + } + + @Override + public @Nonnull List> getCoderArguments() { + return Collections.singletonList(pubSubMessageAvroCoder); + } + + @Override + public void verifyDeterministic() { + } +} diff --git a/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceChangelogConsumerIO.java b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceChangelogConsumerIO.java new file mode 100644 index 0000000000..62666d6eff --- /dev/null +++ b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceChangelogConsumerIO.java @@ -0,0 +1,609 @@ +package com.linkedin.venice.beam.consumer; + +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; +import com.linkedin.davinci.consumer.ChangeEvent; +import com.linkedin.davinci.consumer.VeniceChangeCoordinate; +import com.linkedin.davinci.consumer.VeniceChangelogConsumer; +import com.linkedin.davinci.consumer.VeniceCoordinateOutOfRangeException; +import com.linkedin.venice.pubsub.api.PubSubMessage; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.Read.Unbounded; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.Gauge; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.joda.time.Duration; +import org.joda.time.Instant; + + +/** + * Beam Connector for Venice Change data capture IO. Uses {@link VeniceChangelogConsumer} underneath + * to pull messages from configured venice store. + */ +public final class VeniceChangelogConsumerIO { + private static final Logger LOG = LogManager.getLogger(VeniceChangelogConsumerIO.class); + + private VeniceChangelogConsumerIO() { + } + + public static Read read() { + return new Read<>(); + } + + public static class Read + extends PTransform, VeniceChangeCoordinate>>> { + private static final long serialVersionUID = 1L; + + public enum SeekWhence + implements BiFunction> { + CHECKPOINT { + @Override + public CompletableFuture apply( + VeniceChangelogConsumer consumer, + CheckPointProperties checkPointProperties) { + Set coordinates = checkPointProperties.getCoordinates(); + LOG.info("Seeking To Coordinates {} for store {}", coordinates, checkPointProperties.getStore()); + return consumer.seekToCheckpoint(coordinates); + } + }, + END_OF_PUSH { + @Override + public CompletableFuture apply( + VeniceChangelogConsumer consumer, + CheckPointProperties checkPointProperties) { + LOG.info("Seeking To EndOfPush for store {}", checkPointProperties.getStore()); + return consumer.seekToEndOfPush(); + } + }, + START_OF_PUSH { + @Override + public CompletableFuture apply( + VeniceChangelogConsumer consumer, + CheckPointProperties checkPointProperties) { + LOG.info("Seeking To BeginningOfPush for store {}", checkPointProperties.getStore()); + return consumer.seekToBeginningOfPush(); + } + }, + TAIL { + @Override + public CompletableFuture apply( + VeniceChangelogConsumer consumer, + CheckPointProperties checkPointProperties) { + LOG.info("Seeking To Tail for store {}", checkPointProperties.getStore()); + return consumer.seekToTail(); + } + }, + TIMESTAMP { + // Seeks to given epoch timestamp in milliseconds if positive or rewinds by given + // milliseconds from current time if negative. + @Override + public CompletableFuture apply( + VeniceChangelogConsumer consumer, + CheckPointProperties checkPointProperties) { + long seekTimestamp; + if (checkPointProperties.getSeekTimestamp() < 0) { + seekTimestamp = System.currentTimeMillis() + checkPointProperties.getSeekTimestamp(); + } else { + seekTimestamp = checkPointProperties.getSeekTimestamp(); + } + LOG.info("Seeking To Timestamp {} for store {}", seekTimestamp, checkPointProperties.getStore()); + return consumer.seekToTimestamp(seekTimestamp); + } + }; + } + + private Set partitions = Collections.emptySet(); + private Duration pollTimeout = Duration.standardSeconds(1); + // Positive timestamp is treated as epoch time in milliseconds, negative timestamp is treated as + // rewind time in milliseconds (from current time) + private long seekTimestamp; + private SeekWhence seekWhence = SeekWhence.CHECKPOINT; + + private String store = ""; + private String consumerIdSuffix = ""; + private Duration terminationTimeout = Duration.standardSeconds(30); + + private LocalVeniceChangelogConsumerProvider localVeniceChangelogConsumerProvider; + + public Read() { + } + + public Read(Read read) { + this.consumerIdSuffix = read.consumerIdSuffix; + this.partitions = read.partitions; + this.pollTimeout = read.pollTimeout; + this.seekWhence = read.seekWhence; + this.store = read.store; + this.terminationTimeout = read.terminationTimeout; + this.seekTimestamp = read.seekTimestamp; + this.localVeniceChangelogConsumerProvider = read.localVeniceChangelogConsumerProvider; + } + + @Override + public PCollection, VeniceChangeCoordinate>> expand(PBegin input) { + Source source = new Source<>(this); + Unbounded, VeniceChangeCoordinate>> unbounded = + org.apache.beam.sdk.io.Read.from(source); + return input.getPipeline().apply(unbounded); + } + + public Duration getPollTimeout() { + return this.pollTimeout; + } + + public Read setPollTimeout(Duration timeout) { + this.pollTimeout = timeout; + return this; + } + + public SeekWhence getSeekWhence() { + return this.seekWhence; + } + + public Read setSeekWhence(SeekWhence seekWhence) { + this.seekWhence = seekWhence; + return this; + } + + public String getStore() { + return this.store; + } + + public Read setStore(String store) { + this.store = store; + return this; + } + + public Duration getTerminationTimeout() { + return this.terminationTimeout; + } + + public Read setTerminationTimeout(Duration timeout) { + this.terminationTimeout = timeout; + return this; + } + + public long getSeekTimestamp() { + return seekTimestamp; + } + + public Read setSeekTimestamp(long seekTimestamp) { + this.seekTimestamp = seekTimestamp; + return this; + } + + public Set getPartitions() { + return partitions; + } + + public Read setPartitions(Set partitions) { + this.partitions = partitions; + return this; + } + + public LocalVeniceChangelogConsumerProvider getLocalVeniceChangelogConsumerProvider() { + return localVeniceChangelogConsumerProvider; + } + + public Read setLocalVeniceChangelogConsumerProvider( + LocalVeniceChangelogConsumerProvider localVeniceChangelogConsumerProvider) { + this.localVeniceChangelogConsumerProvider = localVeniceChangelogConsumerProvider; + return this; + } + + public String getConsumerIdSuffix() { + return consumerIdSuffix; + } + + public Read setConsumerIdSuffix(String consumerIdSuffix) { + this.consumerIdSuffix = consumerIdSuffix; + return this; + } + + public RemoveMetadata withoutMetadata() { + return new RemoveMetadata(); + } + + public CurrentValueTransform withOnlyCurrentValue(Coder returnTypeCoder) { + return new CurrentValueTransform(returnTypeCoder); + } + + class RemoveMetadata extends PTransform>>> { + private static final long serialVersionUID = 1L; + + @Override + public @Nonnull PCollection>> expand(PBegin pBegin) { + PCollection, VeniceChangeCoordinate>> input = pBegin.apply(Read.this); + return input.apply( + MapElements.via( + new SimpleFunction, VeniceChangeCoordinate>, KV>>() { + @Override + public KV> apply(PubSubMessage, VeniceChangeCoordinate> message) { + return KV.of(message.getKey(), message.getValue()); + } + })) + .setCoder(VeniceMessageCoder.of()); + } + } + + private class CurrentValueTransform extends PTransform>> { + private static final long serialVersionUID = 1L; + private final Coder> _returnTypeCoder; + + CurrentValueTransform(Coder> returnTypeCoder) { + _returnTypeCoder = Objects.requireNonNull(returnTypeCoder); + } + + @Override + public @Nonnull PCollection> expand(PBegin pBegin) { + PCollection>> pCollection = pBegin.apply(new RemoveMetadata()); + return pCollection.apply(MapElements.via(new SimpleFunction>, KV>() { + @Override + public KV apply(KV> message) { + GenericData.Record value = (GenericData.Record) message.getValue().getCurrentValue(); + return KV.of(message.getKey(), value); + } + })).setCoder(_returnTypeCoder); + } + } + } + + private static class Source + extends UnboundedSource, VeniceChangeCoordinate>, VeniceCheckpointMark> { + private static final long serialVersionUID = 1L; + private final Read read; + + Source(Read read) { + this.read = read; + } + + @Override + public @Nonnull List> split(int desiredNumSplits, @Nonnull PipelineOptions options) + throws Exception { + Set partitions = this.read.getPartitions(); + if (partitions.isEmpty()) { + partitions = + IntStream + .range( + 0, + this.read.localVeniceChangelogConsumerProvider.getVeniceChangelogConsumer(this.read.getStore()) + .getPartitionCount()) + .boxed() + .collect(Collectors.toSet()); + LOG.info("Detected store {} has {} partitions", this.read.getStore(), partitions.size()); + } + + // NOTE: Enforces all splits have the same # of partitions. + int numSplits = Math.min(desiredNumSplits, partitions.size()); + while (partitions.size() % numSplits > 0) { + ++numSplits; + } + + List> partitionSplits = new ArrayList<>(numSplits); + for (int i = 0; i < numSplits; ++i) { + partitionSplits.add(new HashSet<>()); + } + for (int partition: partitions) { + partitionSplits.get(partition % numSplits).add(partition); + } + + List> sourceSplits = new ArrayList<>(numSplits); + for (Set partitionSplit: partitionSplits) { + sourceSplits.add(new Source<>(new Read(this.read).setPartitions(partitionSplit))); + } + return sourceSplits; + } + + @Override + public @Nonnull UnboundedReader, VeniceChangeCoordinate>> createReader( + @Nonnull PipelineOptions options, + VeniceCheckpointMark checkpointMark) { + LOG.debug("Creating reader for store {} and partitions {}", this.read.getStore(), this.read.getPartitions()); + return new PubSubMessageReader<>(this.read, checkpointMark); + } + + @Override + public @Nonnull Coder, VeniceChangeCoordinate>> getOutputCoder() { + return PubSubMessageCoder.of(); + } + + @Override + public @Nonnull Coder getCheckpointMarkCoder() { + return new VeniceCheckpointMark.Coder(); + } + } + + private static class PubSubMessageReader + extends UnboundedSource.UnboundedReader, VeniceChangeCoordinate>> { + private final Read read; + private final Map _partitionToVeniceChangeCoordinates = new HashMap<>(); + + // NOTE: Poll consumer using separate thread for performance. + private final ExecutorService service = Executors.newSingleThreadExecutor(); + private final BlockingQueue, VeniceChangeCoordinate>>> queue = + new SynchronousQueue<>(); + private final AtomicBoolean closed = new AtomicBoolean(false); + private PeekingIterator, VeniceChangeCoordinate>> batch = + Iterators.peekingIterator(Collections.emptyIterator()); + + private final Gauge checkpointPubsubTimestamp; + private final Gauge checkpointPubsubLag; + private final Counter revisedCheckpoints; + private final Distribution sampledPayloadSize; + private final Counter consumerPollCount; + private final Counter queuePollCount; + + private VeniceChangelogConsumer consumer; + + PubSubMessageReader(Read read, @Nullable VeniceCheckpointMark veniceCheckpointMark) { + this.read = read; + if (veniceCheckpointMark != null) { + veniceCheckpointMark.getVeniceChangeCoordinates() + .forEach(c -> _partitionToVeniceChangeCoordinates.put(c.getPartition(), c)); + } + final String metricPrefix = String.join("_", this.read.store, this.read.partitions.toString()); + this.checkpointPubsubTimestamp = + Metrics.gauge(VeniceChangelogConsumerIO.class, metricPrefix + "_CheckpointPubsubTimestamp"); + this.checkpointPubsubLag = Metrics.gauge(VeniceChangelogConsumerIO.class, metricPrefix + "_CheckpointPubsubLag"); + this.revisedCheckpoints = Metrics.counter(VeniceChangelogConsumerIO.class, metricPrefix + "_RevisedCheckpoints"); + this.sampledPayloadSize = + Metrics.distribution(VeniceChangelogConsumerIO.class, metricPrefix + "_SampledPayloadSize"); + this.consumerPollCount = Metrics.counter(VeniceChangelogConsumerIO.class, metricPrefix + "_ConsumerPollCount"); + this.queuePollCount = Metrics.counter(VeniceChangelogConsumerIO.class, metricPrefix + "_QueuePollCount"); + } + + /** + * Initializes veniceChangeLogConsumer and subscribes to partitions as prescribed by {@link + * Source#split(int, PipelineOptions)}. + */ + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public boolean start() { + try { + this.consumer = this.read.localVeniceChangelogConsumerProvider.getVeniceChangelogConsumer( + this.read.getStore(), + this.read.getPartitions().toString() + this.read.consumerIdSuffix); + } catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) { + throw new IllegalStateException(e); + } + + try { + this.consumer.subscribe(this.read.partitions).get(); + } catch (ExecutionException | InterruptedException e) { + throw new IllegalStateException(e); + } + + Set veniceChangeCoordinates = read.partitions.stream() + .map(partition -> _partitionToVeniceChangeCoordinates.get(partition)) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + try { + CheckPointProperties checkPointProperties = + new CheckPointProperties(veniceChangeCoordinates, read.seekTimestamp, read.store); + this.read.getSeekWhence().apply(this.consumer, checkPointProperties).get(); + } catch (ExecutionException | InterruptedException e) { + LOG.error( + "Store={} failed to {}={} for partitions={}", + this.read.store, + this.read.seekWhence, + veniceChangeCoordinates, + this.read.partitions, + e); + if (!(e.getCause() instanceof VeniceCoordinateOutOfRangeException)) { + throw new IllegalStateException(e); + } + LOG.warn("SeekingToEndOfPush because checkpoint is likely beyond retention."); + try { + this.consumer.seekToEndOfPush().get(); + } catch (ExecutionException | InterruptedException ee) { + throw new IllegalStateException(ee); + } + } + + // Keep on pulling messages in background + this.service.submit(this::consumerPollLoop); + return advance(); + } + + /** Adds messages to the queue if venice change capture consumer returns any on polling. */ + private void consumerPollLoop() { + Collection, VeniceChangeCoordinate>> messages = Collections.emptyList(); + do { + if (messages.isEmpty()) { + messages = this.consumer.poll(this.read.getPollTimeout().getMillis()); + consumerPollCount.inc(); + LOG.debug( + "Polled & received {} messages from the consumer for partitions {}", + messages.size(), + this.read.getPartitions()); + continue; + } + + try { + // NOTE(SynchronousQueue): Block and wait for another thread to receive. + this.queue.put(messages); + LOG.debug("Added {} messages to queue for partitions {}", messages.size(), this.read.getPartitions()); + messages = Collections.emptyList(); + } catch (InterruptedException e) { + LOG.error("{} consumer thread interrupted", this, e); + break; // exit + } + } while (!this.closed.get()); + LOG.info("{}: Returning from consumer pool loop", this); + } + + /** Fetches latest checkpoint and updates _partitionToVeniceChangeCoordinates. */ + void reviseCheckpoints() { + PubSubMessage, VeniceChangeCoordinate> pubSubMessage = getCurrent(); + if (pubSubMessage == null) { + return; + } + + VeniceChangeCoordinate veniceChangeCoordinate = pubSubMessage.getOffset(); + LOG.debug("Revised checkpoint for partition {}", veniceChangeCoordinate.getPartition()); + _partitionToVeniceChangeCoordinates.put(veniceChangeCoordinate.getPartition(), veniceChangeCoordinate); + + this.checkpointPubsubTimestamp.set(pubSubMessage.getPubSubMessageTime()); + this.checkpointPubsubLag.set(System.currentTimeMillis() - pubSubMessage.getPubSubMessageTime()); + this.revisedCheckpoints.inc(); + this.sampledPayloadSize.update(pubSubMessage.getPayloadSize()); + } + + /** + * Polls messages from the reader and makes them available in queue for consumption. Also + * revises checkpoints whenever the iterator advances. + */ + @Override + public boolean advance() { + if (this.batch.hasNext()) { + this.batch.next(); + // Return if messages are present or else if exhausted continue polling for new messages + if (this.batch.hasNext()) { + return true; + } + } + + Collection, VeniceChangeCoordinate>> messages; + try { + messages = this.queue.poll(this.read.getPollTimeout().getMillis(), TimeUnit.MILLISECONDS); + queuePollCount.inc(); + } catch (InterruptedException e) { + LOG.error("{} advancing thread interrupted", this, e); + return false; + } + + if (messages == null) { + LOG.info( + "{} advancing timed out for store {} and partitions {}", + this, + this.read.getStore(), + this.read.getPartitions()); + return false; + } + + this.batch = Iterators.peekingIterator(messages.iterator()); + reviseCheckpoints(); + LOG.debug( + "Received messages for store {}, partitions {}, number of messages {}", + this.read.getStore(), + this.read.getPartitions(), + messages.size()); + return true; + } + + /** + * Throws {@link NoSuchElementException} if called without iterator initialization else returns + * next element. + */ + @Override + @Nullable + public PubSubMessage, VeniceChangeCoordinate> getCurrent() { + if (!this.batch.hasNext()) { + throw new NoSuchElementException(); + } + return this.batch.peek(); + } + + @Override + public Instant getCurrentTimestamp() { + PubSubMessage, VeniceChangeCoordinate> pubSubMessage = null; + try { + pubSubMessage = this.getCurrent(); + } catch (NoSuchElementException e) { + LOG.debug("No element found defaulting to epoch time for watermark"); + } + if (pubSubMessage == null) { + return Instant.EPOCH; + } + return Instant.ofEpochMilli(this.getCurrent().getPubSubMessageTime()); + } + + @Override + public void close() { + LOG.info("Closing Venice IO connector for store {}, partitions {}", read.store, read.partitions); + this.closed.set(true); + this.service.shutdown(); + + boolean isShutdown; + do { + this.queue.clear(); + try { + isShutdown = + this.service.awaitTermination(this.read.getTerminationTimeout().getStandardSeconds(), TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("{} interrupted while waiting on consumer thread to terminate", this, e); + throw new IllegalStateException(e); + } + } while (!isShutdown); + + // NOTE: Try-catch inside UnboundedSourceSystem closes all readers which otherwise causes NPEs + // on instances not yet started. + if (this.consumer == null) { + return; + } + + try { + this.consumer.close(); + } catch (IllegalStateException e) { + LOG.info( + "Note: Consumer is shared across partitions. Failed to close consumer for store {}, partitions {} since it" + + " might already be closed. Exception reason {}", + read.store, + read.partitions, + e.getMessage()); + } + } + + @Override + public UnboundedSource, VeniceChangeCoordinate>, VeniceCheckpointMark> getCurrentSource() { + return new Source<>(this.read); + } + + @Override + public Instant getWatermark() { + return this.getCurrentTimestamp(); + } + + @Override + public VeniceCheckpointMark getCheckpointMark() { + return new VeniceCheckpointMark(new ArrayList<>(this._partitionToVeniceChangeCoordinates.values())); + } + } +} diff --git a/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceChangelogConsumerProvider.java b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceChangelogConsumerProvider.java new file mode 100644 index 0000000000..ce5b842875 --- /dev/null +++ b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceChangelogConsumerProvider.java @@ -0,0 +1,15 @@ +package com.linkedin.venice.beam.consumer; + +import com.linkedin.davinci.consumer.VeniceChangelogConsumer; +import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; + + +/** Interface to provide a configured {@link VeniceChangelogConsumer} instance. */ +public interface VeniceChangelogConsumerProvider extends Serializable { + VeniceChangelogConsumer getVeniceChangelogConsumer(String storeName) + throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException; + + VeniceChangelogConsumer getVeniceChangelogConsumer(String storeName, String consumerId) + throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException; +} diff --git a/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceCheckpointMark.java b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceCheckpointMark.java new file mode 100644 index 0000000000..db19fe3109 --- /dev/null +++ b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceCheckpointMark.java @@ -0,0 +1,73 @@ +package com.linkedin.venice.beam.consumer; + +import com.linkedin.davinci.consumer.VeniceChangeCoordinate; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +public class VeniceCheckpointMark implements UnboundedSource.CheckpointMark { + private static final Logger LOG = LogManager.getLogger(VeniceCheckpointMark.class); + + private final List _veniceChangeCoordinates; + + VeniceCheckpointMark(List veniceChangeCoordinates) { + this._veniceChangeCoordinates = veniceChangeCoordinates; + } + + @Override + public void finalizeCheckpoint() { + // do Nothing! + } + + public List getVeniceChangeCoordinates() { + return _veniceChangeCoordinates; + } + + public static class Coder extends AtomicCoder { + private static final long serialVersionUID = 1L; + private final VarIntCoder _sizeCoder = VarIntCoder.of(); + private final StringUtf8Coder _stringUtf8Coder = StringUtf8Coder.of(); + + @Override + public void encode(VeniceCheckpointMark value, OutputStream outStream) throws IOException { + LOG.debug("Encoding {} veniceChangeCoordinates", value.getVeniceChangeCoordinates().size()); + _sizeCoder.encode(value.getVeniceChangeCoordinates().size(), outStream); + value.getVeniceChangeCoordinates().forEach(veniceChangeCoordinate -> { + try { + String encodeString = + VeniceChangeCoordinate.convertVeniceChangeCoordinateToStringAndEncode(veniceChangeCoordinate); + _stringUtf8Coder.encode(encodeString, outStream); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + }); + } + + @Override + public VeniceCheckpointMark decode(InputStream inStream) throws IOException { + int listSize = _sizeCoder.decode(inStream); + LOG.info("Decoding {} veniceChangeCoordinates", listSize); + List veniceChangeCoordinates = new ArrayList<>(listSize); + for (int i = 0; i < listSize; i++) { + String decodedString = _stringUtf8Coder.decode(inStream); + try { + VeniceChangeCoordinate veniceChangeCoordinate = + VeniceChangeCoordinate.decodeStringAndConvertToVeniceChangeCoordinate(decodedString); + veniceChangeCoordinates.add(veniceChangeCoordinate); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException(e); + } + } + return new VeniceCheckpointMark(veniceChangeCoordinates); + } + } +} diff --git a/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceMessageCoder.java b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceMessageCoder.java new file mode 100644 index 0000000000..5917ae77b7 --- /dev/null +++ b/integrations/venice-beam/src/main/java/com/linkedin/venice/beam/consumer/VeniceMessageCoder.java @@ -0,0 +1,63 @@ +package com.linkedin.venice.beam.consumer; + +import com.linkedin.davinci.consumer.ChangeEvent; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TypeDescriptor; + + +/** + * Uses {@link KvCoder} to encode/decode {@link com.linkedin.venice.pubsub.api.PubSubMessage} key + * and value. + */ +public final class VeniceMessageCoder extends StructuredCoder>> { + private static final long serialVersionUID = 1L; + private final AvroCoder keyCoder = AvroCoder.of(new TypeDescriptor() { + }); + private final AvroCoder valueCoder = AvroCoder.of(new TypeDescriptor() { + }); + + private final ListCoder listCoder = ListCoder.of(valueCoder); + private final KvCoder> _kvCoder = KvCoder.of(keyCoder, listCoder); + + private VeniceMessageCoder() { + } + + public static VeniceMessageCoder of() { + return new VeniceMessageCoder<>(); + } + + @Override + public void encode(KV> value, OutputStream outStream) throws IOException { + ChangeEvent changeEvent = value.getValue(); + List values = new ArrayList<>(); + values.add(changeEvent.getCurrentValue()); + values.add(changeEvent.getPreviousValue()); + _kvCoder.encode(KV.of(value.getKey(), values), outStream); + } + + @Override + public KV> decode(InputStream inStream) throws IOException { + KV> kv = _kvCoder.decode(inStream); + return KV.of(kv.getKey(), new ChangeEvent<>(kv.getValue().get(1), kv.getValue().get(0))); + } + + @Override + public List> getCoderArguments() { + return Collections.singletonList(_kvCoder); + } + + @Override + public void verifyDeterministic() { + } +} diff --git a/settings.gradle b/settings.gradle index 9b12bfd5ef..e9eeebc98f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -69,6 +69,7 @@ include 'internal:alpini:router:alpini-router-base' include 'internal:alpini:router:alpini-router-impl' // 3rd-party system integration modules +include 'integrations:venice-beam' include 'integrations:venice-pulsar' include 'integrations:venice-samza'