Skip to content

Commit

Permalink
[compat] [controller] add rt topic name in store config (#1345)
Browse files Browse the repository at this point in the history
* add a new field  in store and store-version's hybrid configs and add apis in Utils to get real time topic name using these configs
* address review comments
  • Loading branch information
arjun4084346 authored Nov 26, 2024
1 parent b2d29b8 commit cc6e3db
Show file tree
Hide file tree
Showing 16 changed files with 1,862 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class ControllerApiConstants {
public static final String TIME_LAG_TO_GO_ONLINE = "time_lag_to_go_online";
public static final String DATA_REPLICATION_POLICY = "data_replication_policy";
public static final String BUFFER_REPLAY_POLICY = "buffer_replay_policy";
public static final String REAL_TIME_TOPIC_NAME = "real_time_topic_name";
public static final String COMPRESSION_STRATEGY = "compression_strategy";
public static final String CLIENT_DECOMPRESSION_ENABLED = "client_decompression_enabled";
public static final String CHUNKING_ENABLED = "chunking_enabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static com.linkedin.venice.controllerapi.ControllerApiConstants.PUSH_STREAM_SOURCE_ADDRESS;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.READ_COMPUTATION_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.READ_QUOTA_IN_CU;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REAL_TIME_TOPIC_NAME;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REGIONS_FILTER;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REGULAR_VERSION_ETL_ENABLED;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.REPLICATE_ALL_CONFIGS;
Expand Down Expand Up @@ -359,6 +360,14 @@ public Optional<BufferReplayPolicy> getHybridBufferReplayPolicy() {
return Optional.ofNullable(params.get(BUFFER_REPLAY_POLICY)).map(BufferReplayPolicy::valueOf);
}

public UpdateStoreQueryParams setRealTimeTopicName(String realTimeTopicName) {
return putString(REAL_TIME_TOPIC_NAME, realTimeTopicName);
}

public Optional<String> getRealTimeTopicName() {
return getString(REAL_TIME_TOPIC_NAME);
}

public UpdateStoreQueryParams setAccessControlled(boolean accessControlled) {
return putBoolean(ACCESS_CONTROLLED, accessControlled);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,9 @@ public interface HybridStoreConfig extends DataModelBackedStructure<StoreHybridC

BufferReplayPolicy getBufferReplayPolicy();

String getRealTimeTopicName();

void setRealTimeTopicName(String realTimeTopicName);

HybridStoreConfig clone();
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.meta;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -17,15 +18,33 @@ public class HybridStoreConfigImpl implements HybridStoreConfig {
public static final long DEFAULT_REWIND_TIME_IN_SECONDS = Time.SECONDS_PER_DAY;
public static final long DEFAULT_HYBRID_TIME_LAG_THRESHOLD = -1L;
public static final long DEFAULT_HYBRID_OFFSET_LAG_THRESHOLD = 1000L;
public static final String DEFAULT_REAL_TIME_TOPIC_NAME = "";

private final StoreHybridConfig hybridConfig;

public HybridStoreConfigImpl(
long rewindTimeInSeconds,
long offsetLagThresholdToGoOnline,
long producerTimestampLagThresholdToGoOnlineInSeconds,
DataReplicationPolicy dataReplicationPolicy,
BufferReplayPolicy bufferReplayPolicy) {
this(
rewindTimeInSeconds,
offsetLagThresholdToGoOnline,
producerTimestampLagThresholdToGoOnlineInSeconds,
dataReplicationPolicy,
bufferReplayPolicy,
DEFAULT_REAL_TIME_TOPIC_NAME);
}

@JsonCreator
public HybridStoreConfigImpl(
@JsonProperty("rewindTimeInSeconds") long rewindTimeInSeconds,
@JsonProperty("offsetLagThresholdToGoOnline") long offsetLagThresholdToGoOnline,
@JsonProperty("producerTimestampLagThresholdToGoOnlineInSeconds") long producerTimestampLagThresholdToGoOnlineInSeconds,
@JsonProperty("dataReplicationPolicy") DataReplicationPolicy dataReplicationPolicy,
@JsonProperty("bufferReplayPolicy") BufferReplayPolicy bufferReplayPolicy) {
@JsonProperty("bufferReplayPolicy") BufferReplayPolicy bufferReplayPolicy,
@JsonProperty("realTimeTopicName") String realTimeTopicName) {
this.hybridConfig = new StoreHybridConfig();
this.hybridConfig.rewindTimeInSeconds = rewindTimeInSeconds;
this.hybridConfig.offsetLagThresholdToGoOnline = offsetLagThresholdToGoOnline;
Expand All @@ -37,6 +56,7 @@ public HybridStoreConfigImpl(
: dataReplicationPolicy.getValue();
this.hybridConfig.bufferReplayPolicy =
bufferReplayPolicy == null ? BufferReplayPolicy.REWIND_FROM_EOP.getValue() : bufferReplayPolicy.getValue();
this.hybridConfig.realTimeTopicName = realTimeTopicName;
}

HybridStoreConfigImpl(StoreHybridConfig config) {
Expand Down Expand Up @@ -83,6 +103,16 @@ public BufferReplayPolicy getBufferReplayPolicy() {
return BufferReplayPolicy.valueOf(this.hybridConfig.bufferReplayPolicy);
}

@Override
public String getRealTimeTopicName() {
return this.hybridConfig.realTimeTopicName.toString();
}

@Override
public void setRealTimeTopicName(String realTimeTopicName) {
this.hybridConfig.realTimeTopicName = realTimeTopicName;
}

@Override
public StoreHybridConfig dataModel() {
return this.hybridConfig;
Expand Down Expand Up @@ -112,6 +142,7 @@ public HybridStoreConfig clone() {
getOffsetLagThresholdToGoOnline(),
getProducerTimestampLagThresholdToGoOnlineInSeconds(),
getDataReplicationPolicy(),
getBufferReplayPolicy());
getBufferReplayPolicy(),
getRealTimeTopicName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,16 @@ public BufferReplayPolicy getBufferReplayPolicy() {
return this.delegate.getBufferReplayPolicy();
}

@Override
public String getRealTimeTopicName() {
return this.delegate.getRealTimeTopicName();
}

@Override
public void setRealTimeTopicName(String realTimeTopicName) {
throw new UnsupportedOperationException();
}

@Override
public HybridStoreConfig clone() {
return this.delegate.clone();
Expand Down Expand Up @@ -513,6 +523,11 @@ public void setUseVersionLevelIncrementalPushEnabled(boolean versionLevelIncreme
throw new UnsupportedOperationException();
}

@Override
public boolean isHybrid() {
return this.delegate.isHybrid();
}

@Override
public HybridStoreConfig getHybridStoreConfig() {
HybridStoreConfig config = this.delegate.getHybridStoreConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ default void setLeaderFollowerModelEnabled(boolean leaderFollowerModelEnabled) {

void setUseVersionLevelIncrementalPushEnabled(boolean versionLevelIncrementalPushEnabled);

boolean isHybrid();

HybridStoreConfig getHybridStoreConfig();

void setHybridStoreConfig(HybridStoreConfig hybridConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ public void setUseVersionLevelIncrementalPushEnabled(boolean versionLevelIncreme
this.storeVersion.useVersionLevelIncrementalPushEnabled = versionLevelIncrementalPushEnabled;
}

@Override
public boolean isHybrid() {
return getHybridStoreConfig() != null;
}

@Override
public HybridStoreConfig getHybridStoreConfig() {
if (this.storeVersion.hybridConfig == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public enum AvroProtocolDefinition {
*
* TODO: Move AdminOperation to venice-common module so that we can properly reference it here.
*/
ADMIN_OPERATION(82, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"),
ADMIN_OPERATION(83, SpecificData.get().getSchema(ByteBuffer.class), "AdminOperation"),

/**
* Single chunk of a large multi-chunk value. Just a bunch of bytes.
Expand Down Expand Up @@ -143,7 +143,7 @@ public enum AvroProtocolDefinition {
/**
* Value schema for metadata system store.
*/
METADATA_SYSTEM_SCHEMA_STORE(25, StoreMetaValue.class),
METADATA_SYSTEM_SCHEMA_STORE(26, StoreMetaValue.class),

/**
* Key schema for push status system store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import com.linkedin.venice.helix.HelixState;
import com.linkedin.venice.helix.Replica;
import com.linkedin.venice.helix.ResourceAssignment;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.Partition;
import com.linkedin.venice.meta.PartitionAssignment;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.RoutingDataRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
Expand Down Expand Up @@ -66,6 +68,7 @@
import org.apache.http.HttpStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;


/**
Expand Down Expand Up @@ -529,6 +532,101 @@ public static File getTempDataDirectory(String prefix) {
}
}

/** This method should only be used for system stores.
* For other stores, use {@link Utils#getRealTimeTopicName(Store)}, {@link Utils#getRealTimeTopicName(StoreInfo)} or
* {@link Utils#getRealTimeTopicName(Version)}
*/
public static String composeRealTimeTopic(String storeName) {
return storeName + Version.REAL_TIME_TOPIC_SUFFIX;
}

/**
* It follows the following order to search for real time topic name,
* i) current store-version config, ii) store config, iii) other store-version configs, iv) default name
*/
public static String getRealTimeTopicName(Store store) {
return getRealTimeTopicName(
store.getName(),
store.getVersions(),
store.getCurrentVersion(),
store.getHybridStoreConfig());
}

public static String getRealTimeTopicName(StoreInfo storeInfo) {
return getRealTimeTopicName(
storeInfo.getName(),
storeInfo.getVersions(),
storeInfo.getCurrentVersion(),
storeInfo.getHybridStoreConfig());
}

public static String getRealTimeTopicName(Version version) {
HybridStoreConfig hybridStoreConfig = version.getHybridStoreConfig();
if (hybridStoreConfig != null) {
String realTimeTopicName = version.getHybridStoreConfig().getRealTimeTopicName();
return getRealTimeTopicNameIfEmpty(realTimeTopicName, version.getStoreName());
} else {
// if the version is not hybrid, caller should not ask for the real time topic,
// but unfortunately that happens, so instead of throwing exception, we just return a default name.
return composeRealTimeTopic(version.getStoreName());
}
}

static String getRealTimeTopicName(
String storeName,
List<Version> versions,
int currentVersionNumber,
HybridStoreConfig hybridStoreConfig) {
if (currentVersionNumber < 1) {
return composeRealTimeTopic(storeName);
}

Optional<Version> currentVersion =
versions.stream().filter(version -> version.getNumber() == currentVersionNumber).findFirst();
if (currentVersion.isPresent() && currentVersion.get().isHybrid()) {
String realTimeTopicName = currentVersion.get().getHybridStoreConfig().getRealTimeTopicName();
if (Strings.isNotBlank(realTimeTopicName)) {
return realTimeTopicName;
}
}

if (hybridStoreConfig != null) {
String realTimeTopicName = hybridStoreConfig.getRealTimeTopicName();
return getRealTimeTopicNameIfEmpty(realTimeTopicName, storeName);
}

Set<String> realTimeTopicNames = new HashSet<>();

for (Version version: versions) {
try {
if (version.isHybrid()) {
String realTimeTopicName = version.getHybridStoreConfig().getRealTimeTopicName();
if (Strings.isNotBlank(realTimeTopicName)) {
realTimeTopicNames.add(realTimeTopicName);
}
}
} catch (VeniceException e) {
// just try another version
}
}

if (realTimeTopicNames.size() > 1) {
LOGGER.warn(
"Store " + storeName + " and current version are not hybrid, yet " + realTimeTopicNames.size()
+ " older versions are using real time topics. Will return one of them.");
}

if (!realTimeTopicNames.isEmpty()) {
return realTimeTopicNames.iterator().next();
}

return composeRealTimeTopic(storeName);
}

private static String getRealTimeTopicNameIfEmpty(String realTimeTopicName, String storeName) {
return Strings.isBlank(realTimeTopicName) ? composeRealTimeTopic(storeName) : realTimeTopicName;
}

private static class TimeUnitInfo {
String suffix;
int multiplier;
Expand Down
Loading

0 comments on commit cc6e3db

Please sign in to comment.