diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java index daa27050e7..f2a7aec035 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/ControllerApiConstants.java @@ -55,6 +55,7 @@ public class ControllerApiConstants { public static final String TIME_LAG_TO_GO_ONLINE = "time_lag_to_go_online"; public static final String DATA_REPLICATION_POLICY = "data_replication_policy"; public static final String BUFFER_REPLAY_POLICY = "buffer_replay_policy"; + public static final String REAL_TIME_TOPIC_NAME = "real_time_topic_name"; public static final String COMPRESSION_STRATEGY = "compression_strategy"; public static final String CLIENT_DECOMPRESSION_ENABLED = "client_decompression_enabled"; public static final String CHUNKING_ENABLED = "chunking_enabled"; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java index a6e1ee5ed7..f5e6fd7b28 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/UpdateStoreQueryParams.java @@ -45,6 +45,7 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.PUSH_STREAM_SOURCE_ADDRESS; import static com.linkedin.venice.controllerapi.ControllerApiConstants.READ_COMPUTATION_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.READ_QUOTA_IN_CU; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.REAL_TIME_TOPIC_NAME; import static com.linkedin.venice.controllerapi.ControllerApiConstants.REGIONS_FILTER; import static com.linkedin.venice.controllerapi.ControllerApiConstants.REGULAR_VERSION_ETL_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.REPLICATE_ALL_CONFIGS; @@ -359,6 +360,14 @@ public Optional getHybridBufferReplayPolicy() { return Optional.ofNullable(params.get(BUFFER_REPLAY_POLICY)).map(BufferReplayPolicy::valueOf); } + public UpdateStoreQueryParams setRealTimeTopicName(String realTimeTopicName) { + return putString(REAL_TIME_TOPIC_NAME, realTimeTopicName); + } + + public Optional getRealTimeTopicName() { + return getString(REAL_TIME_TOPIC_NAME); + } + public UpdateStoreQueryParams setAccessControlled(boolean accessControlled) { return putBoolean(ACCESS_CONTROLLED, accessControlled); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/HybridStoreConfig.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/HybridStoreConfig.java index 061098317f..6961460e61 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/HybridStoreConfig.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/HybridStoreConfig.java @@ -26,5 +26,9 @@ public interface HybridStoreConfig extends DataModelBackedStructure versions, + int currentVersionNumber, + HybridStoreConfig hybridStoreConfig) { + if (currentVersionNumber < 1) { + return composeRealTimeTopic(storeName); + } + + Optional currentVersion = + versions.stream().filter(version -> version.getNumber() == currentVersionNumber).findFirst(); + if (currentVersion.isPresent() && currentVersion.get().isHybrid()) { + String realTimeTopicName = currentVersion.get().getHybridStoreConfig().getRealTimeTopicName(); + if (Strings.isNotBlank(realTimeTopicName)) { + return realTimeTopicName; + } + } + + if (hybridStoreConfig != null) { + String realTimeTopicName = hybridStoreConfig.getRealTimeTopicName(); + return getRealTimeTopicNameIfEmpty(realTimeTopicName, storeName); + } + + Set realTimeTopicNames = new HashSet<>(); + + for (Version version: versions) { + try { + if (version.isHybrid()) { + String realTimeTopicName = version.getHybridStoreConfig().getRealTimeTopicName(); + if (Strings.isNotBlank(realTimeTopicName)) { + realTimeTopicNames.add(realTimeTopicName); + } + } + } catch (VeniceException e) { + // just try another version + } + } + + if (realTimeTopicNames.size() > 1) { + LOGGER.warn( + "Store " + storeName + " and current version are not hybrid, yet " + realTimeTopicNames.size() + + " older versions are using real time topics. Will return one of them."); + } + + if (!realTimeTopicNames.isEmpty()) { + return realTimeTopicNames.iterator().next(); + } + + return composeRealTimeTopic(storeName); + } + + private static String getRealTimeTopicNameIfEmpty(String realTimeTopicName, String storeName) { + return Strings.isBlank(realTimeTopicName) ? composeRealTimeTopic(storeName) : realTimeTopicName; + } + private static class TimeUnitInfo { String suffix; int multiplier; diff --git a/internal/venice-common/src/main/resources/avro/StoreMetaValue/v26/StoreMetaValue.avsc b/internal/venice-common/src/main/resources/avro/StoreMetaValue/v26/StoreMetaValue.avsc new file mode 100644 index 0000000000..bddad29f57 --- /dev/null +++ b/internal/venice-common/src/main/resources/avro/StoreMetaValue/v26/StoreMetaValue.avsc @@ -0,0 +1,402 @@ +{ + "name": "StoreMetaValue", + "namespace": "com.linkedin.venice.systemstore.schemas", + "type": "record", + "fields": [ + { + "name": "timestamp", + "doc": "Timestamp when the value or a partial update for the value was generated by the writer (Venice Controller/Venice Server).", + "type": "long", + "default": 0 + }, + { + "name": "storeProperties", + "type": [ + "null", + { + "name": "StoreProperties", + "doc": "This type contains all the store configs and the corresponding versions", + "type": "record", + "fields": [ + {"name": "name", "type": "string", "doc": "Store name."}, + {"name": "owner", "type": "string", "doc": "Owner of this store."}, + {"name": "createdTime", "type": "long", "doc": "Timestamp when this store was created."}, + {"name": "currentVersion", "type": "int", "default": 0, "doc": "The number of version which is used currently."}, + {"name": "partitionCount", "type": "int", "default": 0, "doc": "Default partition count for all of versions in this store. Once first version become online, the number will be assigned."}, + {"name": "lowWatermark", "type": "long", "default": 0, "doc": "EOIP control message timestamp of the most recent incremental push that has been marked successful"}, + {"name": "enableWrites", "type": "boolean", "default": true, "doc": "If a store is disabled from writing, new version can not be created for it."}, + {"name": "enableReads", "type": "boolean", "default": true, "doc": "If a store is disabled from being read, none of versions under this store could serve read requests."}, + {"name": "storageQuotaInByte", "type": "long", "default": 21474836480, "doc": "Maximum capacity a store version is able to have, and default is 20GB"}, + {"name": "persistenceType", "type": "int", "default": 2, "doc": "Type of persistence storage engine, and default is 'ROCKS_DB'"}, + {"name": "routingStrategy", "type": "int", "default": 0, "doc": "How to route the key to partition, and default is 'CONSISTENT_HASH'"}, + {"name": "readStrategy", "type": "int", "default": 0, "doc": "How to read data from multiple replications, and default is 'ANY_OF_ONLINE'"}, + {"name": "offlinePushStrategy", "type": "int", "default": 1, "doc": "When doing off-line push, how to decide the data is ready to serve, and default is 'WAIT_N_MINUS_ONE_REPLCIA_PER_PARTITION'"}, + {"name": "largestUsedVersionNumber", "type": "int", "default": 0, "doc": "The largest version number ever used before for this store."}, + {"name": "readQuotaInCU", "type": "long", "default": 0, "doc": "Quota for read request hit this store. Measurement is capacity unit."}, + { + "name": "hybridConfig", + "doc": "Properties related to Hybrid Store behavior. If absent (null), then the store is not hybrid.", + "type": [ + "null", + { + "name": "StoreHybridConfig", + "type": "record", + "fields": [ + {"name": "rewindTimeInSeconds", "type": "long"}, + {"name": "offsetLagThresholdToGoOnline", "type": "long"}, + {"name": "producerTimestampLagThresholdToGoOnlineInSeconds", "type": "long"}, + {"name": "dataReplicationPolicy", "type": "int", "default": 0, "doc": "Real-time Samza job data replication policy, and default is 'NON_AGGREGATE'"}, + { + "name": "bufferReplayPolicy", + "type": "int", + "doc": "Policy that will be used during buffer replay. rewindTimeInSeconds defines the delta. 0 => REWIND_FROM_EOP (replay from 'EOP - rewindTimeInSeconds'), 1 => REWIND_FROM_SOP (replay from 'SOP - rewindTimeInSeconds')", + "default": 0 + }, + {"name": "realTimeTopicName", "type": "string", "default": "", "doc": "Name of the real time topic this store/version uses"} + ] + } + ], + "default": null + }, + { + "name": "views", + "doc": "A map of views which describe and configure a downstream view of a venice store. Keys in this map are for convenience of managing configs.", + "type": { + "type":"map", + "values": { + "name": "StoreViewConfig", + "type": "record", + "doc": "A configuration for a particular view. This config should inform Venice leaders how to transform and transmit data to destination views.", + "fields": [ + { + "name": "viewClassName", + "type": "string", + "doc": "This informs what kind of view we are materializing. This then informs what kind of parameters are passed to parse this input. This is expected to be a fully formed class path name for materialization.", + "default": "" + }, + { + "name": "viewParameters", + "doc": "Optional parameters to be passed to the given view config.", + "type": ["null", + { + "type": "map", + "java-key-class": "java.lang.String", + "avro.java.string": "String", + "values": { "type": "string", "avro.java.string": "String" } + } + ], + "default": null + } + ] + } + }, + "default": {} + }, + {"name": "accessControlled", "type": "boolean", "default": true, "doc": "Store-level ACL switch. When disabled, Venice Router should accept every request."}, + {"name": "compressionStrategy", "type": "int", "default": 0, "doc": "Strategy used to compress/decompress Record's value, and default is 'NO_OP'"}, + {"name": "clientDecompressionEnabled", "type": "boolean", "default": true, "doc": "le/Disable client-side record decompression (default: true)"}, + {"name": "chunkingEnabled", "type": "boolean", "default": false, "doc": "Whether current store supports large value (typically more than 1MB). By default, the chunking feature is disabled."}, + {"name": "rmdChunkingEnabled", "type": "boolean", "default": false, "doc": "Whether current store supports large replication metadata (typically more than 1MB). By default, the chunking feature is disabled."}, + {"name": "batchGetLimit", "type": "int", "default": -1, "doc": "Batch get key number limit, and Venice will use cluster-level config if it is not positive."}, + {"name": "numVersionsToPreserve", "type": "int", "default": 0, "doc": "How many versions this store preserve at most. By default it's 0 means we use the cluster level config to determine how many version is preserved."}, + {"name": "incrementalPushEnabled", "type": "boolean", "default": false, "doc": "Flag to see if the store supports incremental push or not"}, + {"name": "separateRealTimeTopicEnabled", "type": "boolean", "default": false, "doc": "Flag to see if the store supports separate real-time topic for incremental push."}, + {"name": "migrating", "type": "boolean", "default": false, "doc": "Whether or not the store is in the process of migration."}, + {"name": "writeComputationEnabled", "type": "boolean", "default": false, "doc": "Whether or not write-path computation feature is enabled for this store."}, + {"name": "readComputationEnabled", "type": "boolean", "default": false, "doc": "Whether read-path computation is enabled for this store."}, + {"name": "bootstrapToOnlineTimeoutInHours", "type": "int", "default": 24, "doc": "Maximum number of hours allowed for the store to transition from bootstrap to online state."}, + {"name": "leaderFollowerModelEnabled", "type": "boolean", "default": false, "doc": "Whether or not to use leader follower state transition model for upcoming version."}, + {"name": "nativeReplicationEnabled", "type": "boolean", "default": false, "doc": "Whether or not native should be enabled for this store. Will only successfully apply if leaderFollowerModelEnabled is also true either in this update or a previous version of the store."}, + {"name": "replicationMetadataVersionID", "type": "int", "default": -1, "doc": "RMD (Replication metadata) version ID on the store-level. Default -1 means NOT_SET and the cluster-level RMD version ID should be used for stores."}, + {"name": "pushStreamSourceAddress", "type": "string", "default": "", "doc": "Address to the kafka broker which holds the source of truth topic for this store version."}, + {"name": "backupStrategy", "type": "int", "default": 1, "doc": "Strategies to store backup versions, and default is 'DELETE_ON_NEW_PUSH_START'"}, + {"name": "schemaAutoRegisteFromPushJobEnabled", "type": "boolean", "default": false, "doc": "Whether or not value schema auto registration enabled from push job for this store."}, + {"name": "latestSuperSetValueSchemaId", "type": "int", "default": -1, "doc": "For read compute stores with auto super-set schema enabled, stores the latest super-set value schema ID."}, + {"name": "hybridStoreDiskQuotaEnabled", "type": "boolean", "default": false, "doc": "Whether or not storage disk quota is enabled for a hybrid store. This store config cannot be enabled until the routers and servers in the corresponding cluster are upgraded to the right version: 0.2.249 or above for routers and servers."}, + {"name": "storeMetadataSystemStoreEnabled", "type": "boolean", "default": false, "doc": "Whether or not the store metadata system store is enabled for this store."}, + { + "name": "etlConfig", + "doc": "Properties related to ETL Store behavior.", + "type": [ + "null", + { + "name": "StoreETLConfig", + "type": "record", + "fields": [ + {"name": "etledUserProxyAccount", "type": "string", "doc": "If enabled regular ETL or future version ETL, this account name is part of path for where the ETLed snapshots will go. for example, for user account veniceetl001, snapshots will be published to HDFS /jobs/veniceetl001/storeName."}, + {"name": "regularVersionETLEnabled", "type": "boolean", "doc": "Whether or not enable regular version ETL for this store."}, + {"name": "futureVersionETLEnabled", "type": "boolean", "doc": "Whether or not enable future version ETL - the version that might come online in future - for this store."} + ] + } + ], + "default": null + }, + { + "name": "partitionerConfig", + "doc": "", + "type": [ + "null", + { + "name": "StorePartitionerConfig", + "type": "record", + "fields": [ + {"name": "partitionerClass", "type": "string"}, + {"name": "partitionerParams", "type": {"type": "map", "values": "string"}}, + {"name": "amplificationFactor", "type": "int"} + ] + } + ], + "default": null + }, + {"name": "incrementalPushPolicy", "type": "int", "default": 0, "doc": "Incremental Push Policy to reconcile with real time pushes, and default is 'PUSH_TO_VERSION_TOPIC'"}, + {"name": "latestVersionPromoteToCurrentTimestamp", "type": "long", "default": -1, "doc": "This is used to track the time when a new version is promoted to current version. For now, it is mostly to decide whether a backup version can be removed or not based on retention. For the existing store before this code change, it will be set to be current timestamp."}, + {"name": "backupVersionRetentionMs", "type": "long", "default": -1, "doc": "Backup retention time, and if it is not set (-1), Venice Controller will use the default configured retention. {@link com.linkedin.venice.ConfigKeys#CONTROLLER_BACKUP_VERSION_DEFAULT_RETENTION_MS}."}, + {"name": "replicationFactor", "type": "int", "default": 3, "doc": "The number of replica each store version will keep."}, + {"name": "migrationDuplicateStore", "type": "boolean", "default": false, "doc": "Whether or not the store is a duplicate store in the process of migration."}, + {"name": "nativeReplicationSourceFabric", "type": "string", "default": "", "doc": "The source fabric name to be uses in native replication. Remote consumption will happen from kafka in this fabric."}, + {"name": "daVinciPushStatusStoreEnabled", "type": "boolean", "default": false, "doc": "Whether or not davinci push status store is enabled."}, + {"name": "storeMetaSystemStoreEnabled", "type": "boolean", "default": false, "doc": "Whether or not the store meta system store is enabled for this store."}, + {"name": "activeActiveReplicationEnabled", "type": "boolean", "default": false, "doc": "Whether or not active/active replication is enabled for hybrid stores; eventually this config will replace native replication flag, when all stores are on A/A"}, + {"name": "applyTargetVersionFilterForIncPush", "type": "boolean", "default": false, "doc": "Whether or not the target version field in Kafka messages will be used in increment push to RT policy"}, + {"name": "minCompactionLagSeconds", "type": "long", "default": -1, "doc": "Store level min compaction lag config and if not specified, it will use the global config for version topics"}, + {"name": "maxCompactionLagSeconds", "type": "long", "default": -1, "doc": "Store level max compaction lag config and if not specified, 'max.compaction.lag.ms' config won't be setup in the corresponding version topics"}, + {"name": "maxRecordSizeBytes", "type": "int", "default": -1, "doc": "Store-level max record size in bytes. If not specified (-1), the controller config 'default.max.record.size.bytes' (100MB default) will be backfilled"}, + {"name": "maxNearlineRecordSizeBytes", "type": "int", "default": -1, "doc": "Store-level max record size in bytes for nearline jobs with partial updates. If not specified (-1), the server config 'default.max.record.size.bytes' (100MB default) will be backfilled. This may converge with maxRecordSizeBytes in the future"}, + {"name": "unusedSchemaDeletionEnabled", "type": "boolean", "default": false, "doc": "Store level config to indicate whether unused schema deletion is enabled or not."}, + { + "name": "versions", + "doc": "List of non-retired versions. It's currently sorted and there is code run under the assumption that the last element in the list is the largest. Check out {VeniceHelixAdmin#getIncrementalPushVersion}, and please make it in mind if you want to change this logic", + "type": { + "type": "array", + "items": { + "name": "StoreVersion", + "type": "record", + "doc": "Type describes all the version attributes", + "fields": [ + {"name": "storeName", "type": "string", "doc": "Name of the store which this version belong to."}, + {"name": "number", "type": "int", "doc": "Version number."}, + {"name": "createdTime", "type": "long", "doc": "Time when this version was created."}, + {"name": "status", "type": "int", "default": 1, "doc": "Status of version, and default is 'STARTED'"}, + {"name": "pushJobId", "type": "string", "default": ""}, + {"name": "compressionStrategy", "type": "int", "default": 0, "doc": "strategies used to compress/decompress Record's value, and default is 'NO_OP'"}, + {"name": "leaderFollowerModelEnabled", "type": "boolean", "default": false, "doc": "Whether or not to use leader follower state transition."}, + {"name": "nativeReplicationEnabled", "type": "boolean", "default": false, "doc": "Whether or not native replication is enabled."}, + {"name": "pushStreamSourceAddress", "type": "string", "default": "", "doc": "Address to the kafka broker which holds the source of truth topic for this store version."}, + {"name": "bufferReplayEnabledForHybrid", "type": "boolean", "default": true, "doc": "Whether or not to enable buffer replay for hybrid."}, + {"name": "chunkingEnabled", "type": "boolean", "default": false, "doc": "Whether or not large values are supported (via chunking)."}, + {"name": "rmdChunkingEnabled", "type": "boolean", "default": false, "doc": "Whether or not large replication metadata are supported (via chunking)."}, + {"name": "pushType", "type": "int", "default": 0, "doc": "Producer type for this version, and default is 'BATCH'"}, + {"name": "partitionCount", "type": "int", "default": 0, "doc": "Partition count of this version."}, + { + "name": "partitionerConfig", + "type": [ + "null", + "com.linkedin.venice.systemstore.schemas.StorePartitionerConfig" + ], + "default": null, + "doc": "Config for custom partitioning." + }, + {"name": "incrementalPushPolicy", "type": "int", "default": 0, "doc": "Incremental Push Policy to reconcile with real time pushes., and default is 'PUSH_TO_VERSION_TOPIC'"}, + {"name": "replicationFactor", "type": "int", "default": 3, "doc": "The number of replica this store version is keeping."}, + {"name": "nativeReplicationSourceFabric", "type": "string", "default": "", "doc": "The source fabric name to be uses in native replication. Remote consumption will happen from kafka in this fabric."}, + {"name": "incrementalPushEnabled", "type": "boolean", "default": false, "doc": "Flag to see if the store supports incremental push or not"}, + {"name": "separateRealTimeTopicEnabled", "type": "boolean", "default": false, "doc": "Flag to see if the store supports separate real-time topic for incremental push."}, + {"name": "blobTransferEnabled", "type": "boolean", "default": false, "doc": "Flag to indicate if the blob transfer is allowed or not"}, + {"name": "useVersionLevelIncrementalPushEnabled", "type": "boolean", "default": false, "doc": "Flag to see if incrementalPushEnabled config at StoreVersion should be used. This is needed during migration of this config from Store level to Version level. We can deprecate this field later."}, + { + "name": "hybridConfig", + "type": [ + "null", + "com.linkedin.venice.systemstore.schemas.StoreHybridConfig" + ], + "default": null, + "doc": "Properties related to Hybrid Store behavior. If absent (null), then the store is not hybrid." + }, + {"name": "useVersionLevelHybridConfig", "type": "boolean", "default": false, "doc": "Flag to see if hybridConfig at StoreVersion should be used. This is needed during migration of this config from Store level to Version level. We can deprecate this field later."}, + {"name": "activeActiveReplicationEnabled", "type": "boolean", "default": false, "doc": "Whether or not active/active replication is enabled for hybrid stores; eventually this config will replace native replication flag, when all stores are on A/A"}, + {"name": "timestampMetadataVersionId", "type": "int", "default": -1, "doc": "The A/A timestamp metadata schema version ID that will be used to deserialize metadataPayload."}, + { + "name": "dataRecoveryConfig", + "type": [ + "null", + { + "name": "DataRecoveryConfig", + "type": "record", + "fields": [ + {"name": "dataRecoverySourceFabric", "type": "string", "doc": "The fabric name to be used as the source for data recovery."}, + {"name": "isDataRecoveryComplete", "type": "boolean", "doc": "Whether or not data recovery is complete."}, + {"name": "dataRecoverySourceVersionNumber", "type": "int", "default": 0, "doc": "The store version number to be used as the source for data recovery."} + ] + } + ], + "default": null, + "doc": "Properties related to data recovery mode behavior for this version. If absent (null), then the version never went go through data recovery." + }, + {"name": "deferVersionSwap", "type": "boolean", "default": false, "doc": "flag that informs venice controller to defer marking this version as the serving version after instances report ready to serve. This version must be marked manually as the current version in order to serve traffic from it."}, + { + "name": "views", + "doc": "A list of views which describe and configure a downstream view of a venice store.", + "type": { + "type": "map", + "java-key-class": "java.lang.String", + "avro.java.string": "String", + "values": "com.linkedin.venice.systemstore.schemas.StoreViewConfig" + }, + "default": {} + }, + {"name": "repushSourceVersion", "type": "int", "default": -1, "doc": "For store version created from repush, indicates the source store version its created from."} + ] + } + }, + "default": [] + }, + { + "name": "systemStores", + "doc": "This field is used to maintain a mapping between each type of system store and the corresponding distinct properties", + "type": { + "type": "map", + "values": { + "name": "SystemStoreProperties", + "type": "record", + "doc": "This type describes all the distinct properties", + "fields": [ + {"name": "largestUsedVersionNumber", "type": "int", "default": 0}, + {"name": "currentVersion", "type": "int", "default": 0}, + {"name": "latestVersionPromoteToCurrentTimestamp", "type": "long", "default": -1}, + {"name": "versions", "type": {"type": "array", "items": "com.linkedin.venice.systemstore.schemas.StoreVersion"}, "default": []} + ] + } + }, + "default": {} + }, + {"name": "storageNodeReadQuotaEnabled", "type": "boolean", "default": false, "doc": "Controls the storage node read quota enforcement for the given Venice store"}, + {"name": "blobTransferEnabled", "type": "boolean", "default": false, "doc": "Flag to indicate if the blob transfer is allowed or not"}, + {"name": "nearlineProducerCompressionEnabled", "type": "boolean", "default": true, "doc": "Flag to control whether the producer in Server for near-line workload will enable compression or not"}, + {"name": "nearlineProducerCountPerWriter", "type": "int", "default": 1, "doc": "How many producers will be used for the nearline producer in Server to improve producing throughput"} + ] + } + ], + "default": null + }, + { + "name": "storeKeySchemas", + "doc": "", + "type": [ + "null", + { + "name": "StoreKeySchemas", + "doc": "This type describes the key schemas of the store", + "type": "record", + "fields": [ + { + "name": "keySchemaMap", + "doc": "A string to string map representing the mapping from id to key schema.", + "type": { + "type": "map", + "values": "string" + } + } + ] + } + ], + "default": null + }, + { + "name": "storeValueSchemas", + "doc": "", + "type": [ + "null", + { + "name": "StoreValueSchemas", + "doc": "This type describes the value schemas of the store.", + "type": "record", + "fields": [ + { + "name": "valueSchemaMap", + "doc": "A string to string map representing the mapping from schema id to value schema string. The value could be an empty string indicating the value schema is stored in another field.", + "type": { + "type": "map", + "values": "string" + } + } + ] + } + ], + "default": null + }, + { + "name": "storeValueSchema", + "doc": "", + "type": [ + "null", + { + "name": "StoreValueSchema", + "doc": "This type describes a single version of the value schema of the store.", + "type": "record", + "fields": [ + { + "name": "valueSchema", + "doc": "Store value schema string.", + "type": "string", + "default": "" + } + ] + } + ], + "default": null + }, + { + "name": "storeReplicaStatuses", + "doc": "This field describes the replica statuses per version per partition, and the mapping is 'host_port' -> 'replica status'", + "type": [ + "null", + { + "type": "map", + "values": { + "name": "StoreReplicaStatus", + "type": "record", + "doc": "This structure will contain all kinds of info related to one replica", + "fields": [ + {"name": "status", "type": "int", "doc": "replica status"} + ] + } + } + ], + "default": null + }, + { + "name": "storeValueSchemaIdsWrittenPerStoreVersion", + "doc": "This field described the set of value schemas id written by a store version.", + "type": [ + "null", + { + "name": "StoreValueSchemaIdsWrittenPerStoreVersion", + "doc": "This type describes value schema IDs written by the store version.", + "type": "array", + "items": "int" + } + ], + "default": null + }, + { + "name": "storeClusterConfig", + "doc": "This is the Zk's StoreConfig equivalent which contains various Venice cluster information", + "type": [ + "null", + { + "name": "StoreClusterConfig", + "doc": "This type describes the various Venice cluster information for a store", + "type": "record", + "fields": [ + {"name": "cluster", "type": "string", "default": "", "doc": "The Venice cluster of the store."}, + {"name": "deleting", "type": "boolean", "default": false, "doc": "Is the store undergoing deletion."}, + {"name": "migrationDestCluster", "type": ["null", "string"], "default": null, "doc": "The destination cluster for store migration"}, + {"name": "migrationSrcCluster", "type": ["null", "string"], "default": null, "doc": "The source cluster for store migration"}, + {"name": "storeName", "type": "string", "default": "", "doc": "The name of the store"} + ] + } + ], + "default": null + } + ] +} diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/utils/UtilsTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/utils/UtilsTest.java index 79f2fca4d5..f04f6369b4 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/utils/UtilsTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/utils/UtilsTest.java @@ -1,16 +1,23 @@ package com.linkedin.venice.utils; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.expectThrows; import static org.testng.Assert.fail; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.meta.HybridStoreConfig; +import com.linkedin.venice.meta.Store; +import com.linkedin.venice.meta.StoreInfo; +import com.linkedin.venice.meta.Version; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -19,6 +26,7 @@ import java.util.TreeMap; import org.testng.Assert; import org.testng.annotations.Test; +import org.testng.collections.Lists; /** @@ -229,4 +237,111 @@ public void testResolveKafkaUrlForSepTopic() { Assert.assertEquals(Utils.resolveKafkaUrlForSepTopic(originalKafkaUrlForSep), originalKafkaUrl); Assert.assertEquals(Utils.resolveKafkaUrlForSepTopic(originalKafkaUrl), originalKafkaUrl); } + + @Test + void testGetRealTimeTopicNameWithStore() { + Store mockStore = mock(Store.class); + List mockVersions = Collections.singletonList(mock(Version.class)); + HybridStoreConfig mockHybridConfig = mock(HybridStoreConfig.class); + + when(mockStore.getName()).thenReturn("TestStore"); + when(mockStore.getVersions()).thenReturn(mockVersions); + when(mockStore.getCurrentVersion()).thenReturn(1); + when(mockStore.getHybridStoreConfig()).thenReturn(mockHybridConfig); + + when(mockHybridConfig.getRealTimeTopicName()).thenReturn("RealTimeTopic"); + + String result = Utils.getRealTimeTopicName(mockStore); + assertEquals("RealTimeTopic", result); + } + + @Test + void testGetRealTimeTopicNameWithStoreInfo() { + StoreInfo mockStoreInfo = mock(StoreInfo.class); + List mockVersions = Collections.singletonList(mock(Version.class)); + HybridStoreConfig mockHybridConfig = mock(HybridStoreConfig.class); + + when(mockStoreInfo.getName()).thenReturn("TestStore"); + when(mockStoreInfo.getVersions()).thenReturn(mockVersions); + when(mockStoreInfo.getCurrentVersion()).thenReturn(1); + when(mockStoreInfo.getHybridStoreConfig()).thenReturn(mockHybridConfig); + + when(mockHybridConfig.getRealTimeTopicName()).thenReturn("RealTimeTopic"); + + String result = Utils.getRealTimeTopicName(mockStoreInfo); + assertEquals("RealTimeTopic", result); + } + + @Test + void testGetRealTimeTopicNameWithHybridConfig() { + HybridStoreConfig mockHybridConfig = mock(HybridStoreConfig.class); + + when(mockHybridConfig.getRealTimeTopicName()).thenReturn("RealTimeTopic"); + String result = Utils.getRealTimeTopicName("TestStore", Collections.EMPTY_LIST, 1, mockHybridConfig); + + assertEquals("RealTimeTopic", result); + } + + @Test + void testGetRealTimeTopicNameWithoutHybridConfig() { + String result = Utils.getRealTimeTopicName("TestStore", Collections.EMPTY_LIST, 0, null); + assertEquals("TestStore" + Version.REAL_TIME_TOPIC_SUFFIX, result); + } + + @Test + void testGetRealTimeTopicNameWithConflictingVersions() { + Version mockVersion1 = mock(Version.class); + Version mockVersion2 = mock(Version.class); + HybridStoreConfig mockConfig1 = mock(HybridStoreConfig.class); + HybridStoreConfig mockConfig2 = mock(HybridStoreConfig.class); + + when(mockVersion1.isHybrid()).thenReturn(true); + when(mockVersion2.isHybrid()).thenReturn(true); + when(mockVersion1.getHybridStoreConfig()).thenReturn(mockConfig1); + when(mockVersion2.getHybridStoreConfig()).thenReturn(mockConfig2); + when(mockConfig1.getRealTimeTopicName()).thenReturn("RealTimeTopic1"); + when(mockConfig2.getRealTimeTopicName()).thenReturn("RealTimeTopic2"); + + String result = Utils.getRealTimeTopicName("TestStore", Lists.newArrayList(mockVersion1, mockVersion2), 1, null); + assertTrue(result.equals("RealTimeTopic1") || result.equals("RealTimeTopic2")); + } + + @Test + void testGetRealTimeTopicNameWithExceptionHandling() { + Version mockVersion1 = mock(Version.class); + Version mockVersion2 = mock(Version.class); + + when(mockVersion1.isHybrid()).thenReturn(true); + when(mockVersion1.getHybridStoreConfig()).thenThrow(new VeniceException("Test Exception")); + + when(mockVersion2.isHybrid()).thenReturn(false); + + String result = Utils.getRealTimeTopicName("TestStore", Lists.newArrayList(mockVersion1, mockVersion2), 1, null); + assertEquals("TestStore" + Version.REAL_TIME_TOPIC_SUFFIX, result); + } + + @Test + void testGetRealTimeTopicNameWithVersion() { + Version mockVersion = mock(Version.class); + HybridStoreConfig mockHybridConfig = mock(HybridStoreConfig.class); + + when(mockVersion.getHybridStoreConfig()).thenReturn(mockHybridConfig); + when(mockVersion.getStoreName()).thenReturn("TestStore"); + when(mockHybridConfig.getRealTimeTopicName()).thenReturn("RealTimeTopic"); + + String result = Utils.getRealTimeTopicName(mockVersion); + assertEquals("RealTimeTopic", result); + } + + @Test + void testGetRealTimeTopicNameWithNonHybridVersion() { + // Mocking the Version object + Version mockVersion = mock(Version.class); + + // Mock setup to trigger the exception path + when(mockVersion.getHybridStoreConfig()).thenReturn(null); + when(mockVersion.getStoreName()).thenReturn("TestStore"); + String result = Utils.getRealTimeTopicName(mockVersion); + assertEquals("TestStore" + Version.REAL_TIME_TOPIC_SUFFIX, result); + } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index 9c65c13ede..c426c45091 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -9,6 +9,7 @@ import static com.linkedin.venice.controller.UserSystemStoreLifeCycleHelper.AUTO_META_SYSTEM_STORE_PUSH_ID_PREFIX; import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_HYBRID_OFFSET_LAG_THRESHOLD; import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_HYBRID_TIME_LAG_THRESHOLD; +import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REAL_TIME_TOPIC_NAME; import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REWIND_TIME_IN_SECONDS; import static com.linkedin.venice.meta.Store.NON_EXISTING_VERSION; import static com.linkedin.venice.meta.Version.PushType; @@ -4779,6 +4780,7 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto Optional hybridTimeLagThreshold = params.getHybridTimeLagThreshold(); Optional hybridDataReplicationPolicy = params.getHybridDataReplicationPolicy(); Optional hybridBufferReplayPolicy = params.getHybridBufferReplayPolicy(); + Optional realTimeTopicName = params.getRealTimeTopicName(); Optional accessControlled = params.getAccessControlled(); Optional compressionStrategy = params.getCompressionStrategy(); Optional clientDecompressionEnabled = params.getClientDecompressionEnabled(); @@ -4828,7 +4830,8 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto hybridOffsetLagThreshold, hybridTimeLagThreshold, hybridDataReplicationPolicy, - hybridBufferReplayPolicy); + hybridBufferReplayPolicy, + realTimeTopicName); newHybridStoreConfig = Optional.ofNullable(hybridConfig); } else { newHybridStoreConfig = Optional.empty(); @@ -5160,7 +5163,8 @@ private void enableHybridModeOrUpdateSettings(String clusterName, String storeNa hybridStoreConfig.getOffsetLagThresholdToGoOnline(), hybridStoreConfig.getProducerTimestampLagThresholdToGoOnlineInSeconds(), DataReplicationPolicy.NON_AGGREGATE, - hybridStoreConfig.getBufferReplayPolicy())); + hybridStoreConfig.getBufferReplayPolicy(), + hybridStoreConfig.getRealTimeTopicName())); } return store; }); @@ -5222,7 +5226,8 @@ protected static HybridStoreConfig mergeNewSettingsIntoOldHybridStoreConfig( Optional hybridOffsetLagThreshold, Optional hybridTimeLagThreshold, Optional hybridDataReplicationPolicy, - Optional bufferReplayPolicy) { + Optional bufferReplayPolicy, + Optional realTimeTopicName) { if (!hybridRewindSeconds.isPresent() && !hybridOffsetLagThreshold.isPresent() && !oldStore.isHybrid()) { return null; // For the nullable union in the avro record } @@ -5240,7 +5245,8 @@ protected static HybridStoreConfig mergeNewSettingsIntoOldHybridStoreConfig( hybridDataReplicationPolicy.isPresent() ? hybridDataReplicationPolicy.get() : oldHybridConfig.getDataReplicationPolicy(), - bufferReplayPolicy.isPresent() ? bufferReplayPolicy.get() : oldHybridConfig.getBufferReplayPolicy()); + bufferReplayPolicy.isPresent() ? bufferReplayPolicy.get() : oldHybridConfig.getBufferReplayPolicy(), + realTimeTopicName.orElseGet(oldHybridConfig::getRealTimeTopicName)); } else { // switching a non-hybrid store to hybrid; must specify: // 1. rewind time @@ -5258,7 +5264,8 @@ protected static HybridStoreConfig mergeNewSettingsIntoOldHybridStoreConfig( hybridOffsetLagThreshold.orElse(DEFAULT_HYBRID_OFFSET_LAG_THRESHOLD), hybridTimeLagThreshold.orElse(DEFAULT_HYBRID_TIME_LAG_THRESHOLD), hybridDataReplicationPolicy.orElse(DataReplicationPolicy.NON_AGGREGATE), - bufferReplayPolicy.orElse(BufferReplayPolicy.REWIND_FROM_EOP)); + bufferReplayPolicy.orElse(BufferReplayPolicy.REWIND_FROM_EOP), + realTimeTopicName.orElse(DEFAULT_REAL_TIME_TOPIC_NAME)); } if (mergedHybridStoreConfig.getRewindTimeInSeconds() > 0 && mergedHybridStoreConfig.getOffsetLagThresholdToGoOnline() < 0 @@ -7825,7 +7832,8 @@ boolean isHybrid(HybridStoreConfigRecord hybridStoreConfigRecord) { hybridStoreConfigRecord.offsetLagThresholdToGoOnline, hybridStoreConfigRecord.producerTimestampLagThresholdToGoOnlineInSeconds, DataReplicationPolicy.valueOf(hybridStoreConfigRecord.dataReplicationPolicy), - BufferReplayPolicy.valueOf(hybridStoreConfigRecord.bufferReplayPolicy)); + BufferReplayPolicy.valueOf(hybridStoreConfigRecord.bufferReplayPolicy), + hybridStoreConfigRecord.realTimeTopicName.toString()); } return isHybrid(hybridStoreConfig); } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 5020f60f6a..38eb8f9bba 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -46,6 +46,7 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.PUSH_STREAM_SOURCE_ADDRESS; import static com.linkedin.venice.controllerapi.ControllerApiConstants.READ_COMPUTATION_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.READ_QUOTA_IN_CU; +import static com.linkedin.venice.controllerapi.ControllerApiConstants.REAL_TIME_TOPIC_NAME; import static com.linkedin.venice.controllerapi.ControllerApiConstants.REGULAR_VERSION_ETL_ENABLED; import static com.linkedin.venice.controllerapi.ControllerApiConstants.REPLICATION_FACTOR; import static com.linkedin.venice.controllerapi.ControllerApiConstants.REPLICATION_METADATA_PROTOCOL_VERSION_ID; @@ -62,6 +63,7 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.WRITE_COMPUTATION_ENABLED; import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_HYBRID_OFFSET_LAG_THRESHOLD; import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_HYBRID_TIME_LAG_THRESHOLD; +import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REAL_TIME_TOPIC_NAME; import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REWIND_TIME_IN_SECONDS; import static com.linkedin.venice.meta.Version.VERSION_SEPARATOR; import static com.linkedin.venice.meta.VersionStatus.ONLINE; @@ -2233,6 +2235,7 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa Optional hybridTimeLagThreshold = params.getHybridTimeLagThreshold(); Optional hybridDataReplicationPolicy = params.getHybridDataReplicationPolicy(); Optional hybridBufferReplayPolicy = params.getHybridBufferReplayPolicy(); + Optional realTimeTopicName = params.getRealTimeTopicName(); Optional accessControlled = params.getAccessControlled(); Optional compressionStrategy = params.getCompressionStrategy(); Optional clientDecompressionEnabled = params.getClientDecompressionEnabled(); @@ -2418,13 +2421,15 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa hybridTimeLagThreshold.map(addToUpdatedConfigList(updatedConfigsList, TIME_LAG_TO_GO_ONLINE)); hybridDataReplicationPolicy.map(addToUpdatedConfigList(updatedConfigsList, DATA_REPLICATION_POLICY)); hybridBufferReplayPolicy.map(addToUpdatedConfigList(updatedConfigsList, BUFFER_REPLAY_POLICY)); + realTimeTopicName.map(addToUpdatedConfigList(updatedConfigsList, REAL_TIME_TOPIC_NAME)); HybridStoreConfig updatedHybridStoreConfig = VeniceHelixAdmin.mergeNewSettingsIntoOldHybridStoreConfig( currStore, hybridRewindSeconds, hybridOffsetLagThreshold, hybridTimeLagThreshold, hybridDataReplicationPolicy, - hybridBufferReplayPolicy); + hybridBufferReplayPolicy, + realTimeTopicName); // Get VeniceControllerClusterConfig for the cluster VeniceControllerClusterConfig controllerConfig = @@ -2499,6 +2504,7 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa updatedHybridStoreConfig.getProducerTimestampLagThresholdToGoOnlineInSeconds(); hybridStoreConfigRecord.dataReplicationPolicy = updatedHybridStoreConfig.getDataReplicationPolicy().getValue(); hybridStoreConfigRecord.bufferReplayPolicy = updatedHybridStoreConfig.getBufferReplayPolicy().getValue(); + hybridStoreConfigRecord.realTimeTopicName = updatedHybridStoreConfig.getRealTimeTopicName(); setStore.hybridStoreConfig = hybridStoreConfigRecord; } @@ -2519,6 +2525,7 @@ public void updateStore(String clusterName, String storeName, UpdateStoreQueryPa updatedConfigsList.add(DATA_REPLICATION_POLICY); hybridStoreConfigRecord.bufferReplayPolicy = BufferReplayPolicy.REWIND_FROM_EOP.getValue(); updatedConfigsList.add(BUFFER_REPLAY_POLICY); + hybridStoreConfigRecord.realTimeTopicName = DEFAULT_REAL_TIME_TOPIC_NAME; setStore.hybridStoreConfig = hybridStoreConfigRecord; } diff --git a/services/venice-controller/src/main/resources/avro/AdminOperation/v83/AdminOperation.avsc b/services/venice-controller/src/main/resources/avro/AdminOperation/v83/AdminOperation.avsc new file mode 100644 index 0000000000..732f80b80c --- /dev/null +++ b/services/venice-controller/src/main/resources/avro/AdminOperation/v83/AdminOperation.avsc @@ -0,0 +1,1147 @@ +{ + "name": "AdminOperation", + "namespace": "com.linkedin.venice.controller.kafka.protocol.admin", + "type": "record", + "fields": [ + { + "name": "operationType", + "doc": "0 => StoreCreation, 1 => ValueSchemaCreation, 2 => PauseStore, 3 => ResumeStore, 4 => KillOfflinePushJob, 5 => DisableStoreRead, 6 => EnableStoreRead, 7=> DeleteAllVersions, 8=> SetStoreOwner, 9=> SetStorePartitionCount, 10=> SetStoreCurrentVersion, 11=> UpdateStore, 12=> DeleteStore, 13=> DeleteOldVersion, 14=> MigrateStore, 15=> AbortMigration, 16=>AddVersion, 17=> DerivedSchemaCreation, 18=>SupersetSchemaCreation, 19=>EnableNativeReplicationForCluster, 20=>MetadataSchemaCreation, 21=>EnableActiveActiveReplicationForCluster, 25=>CreatePersona, 26=>DeletePersona, 27=>UpdatePersona, 28=>RollbackCurrentVersion, 29=>RollforwardCurrentVersion", + "type": "int" + }, { + "name": "executionId", + "doc": "ID of a command execution which is used to query the status of this command.", + "type": "long", + "default": 0 + }, { + "name": "payloadUnion", + "doc": "This contains the main payload of the admin operation", + "type": [ + { + "name": "StoreCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "owner", + "type": "string" + }, + { + "name": "keySchema", + "type": { + "type": "record", + "name": "SchemaMeta", + "fields": [ + {"name": "schemaType", "type": "int", "doc": "0 => Avro-1.4, and we can add more if necessary"}, + {"name": "definition", "type": "string"} + ] + } + }, + { + "name": "valueSchema", + "type": "SchemaMeta" + } + ] + }, + { + "name": "ValueSchemaCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "schema", + "type": "SchemaMeta" + }, + { + "name": "schemaId", + "type": "int" + }, + { + "name": "doUpdateSupersetSchemaID", + "type": "boolean", + "doc": "Whether this superset schema ID should be updated to be the value schema ID for this store.", + "default": false + } + ] + }, + { + "name": "PauseStore", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "ResumeStore", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "KillOfflinePushJob", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "kafkaTopic", + "type": "string" + } + ] + }, + { + "name": "DisableStoreRead", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "EnableStoreRead", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "DeleteAllVersions", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "SetStoreOwner", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "owner", + "type": "string" + } + ] + }, + { + "name": "SetStorePartitionCount", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "partitionNum", + "type": "int" + } + ] + }, + { + "name": "SetStoreCurrentVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "currentVersion", + "type": "int" + } + ] + }, + { + "name": "UpdateStore", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "owner", + "type": "string" + }, + { + "name": "partitionNum", + "type": "int" + }, + { + "name": "currentVersion", + "type": "int" + }, + { + "name": "enableReads", + "type": "boolean" + }, + { + "name": "enableWrites", + "type": "boolean" + }, + { + "name": "storageQuotaInByte", + "type": "long", + "default": 21474836480 + }, + { + "name": "readQuotaInCU", + "type": "long", + "default": 1800 + }, + { + "name": "hybridStoreConfig", + "type": [ + "null", + { + "name": "HybridStoreConfigRecord", + "type": "record", + "fields": [ + { + "name": "rewindTimeInSeconds", + "type": "long" + }, + { + "name": "offsetLagThresholdToGoOnline", + "type": "long" + }, + { + "name": "producerTimestampLagThresholdToGoOnlineInSeconds", + "type": "long", + "default": -1 + }, + { + "name": "dataReplicationPolicy", + "doc": "Real-time Samza job data replication policy. Using int because Avro Enums are not evolvable 0 => NON_AGGREGATE, 1 => AGGREGATE, 2 => NONE, 3 => ACTIVE_ACTIVE", + "type": "int", + "default": 0 + }, + { + "name": "bufferReplayPolicy", + "type": "int", + "doc": "Policy that will be used during buffer replay. rewindTimeInSeconds defines the delta. 0 => REWIND_FROM_EOP (replay from 'EOP - rewindTimeInSeconds'), 1 => REWIND_FROM_SOP (replay from 'SOP - rewindTimeInSeconds')", + "default": 0 + }, + {"name": "realTimeTopicName", "type": "string", "default": "", "doc": "Name of the real time topic this store/version uses"} + ] + } + ], + "default": null + }, + { + "name": "accessControlled", + "type": "boolean", + "default": false + }, + { + "name": "compressionStrategy", + "doc": "Using int because Avro Enums are not evolvable", + "type": "int", + "default": 0 + }, + { + "name": "chunkingEnabled", + "type": "boolean", + "default": false + }, + { + "name": "rmdChunkingEnabled", + "type": "boolean", + "default": false + }, + { + "name": "singleGetRouterCacheEnabled", + "aliases": ["routerCacheEnabled"], + "type": "boolean", + "default": false + }, + { + "name": "batchGetRouterCacheEnabled", + "type": "boolean", + "default": false + }, + { + "name": "batchGetLimit", + "doc": "The max key number allowed in batch get request, and Venice will use cluster-level config if the limit (not positive) is not valid", + "type": "int", + "default": -1 + }, + { + "name": "numVersionsToPreserve", + "doc": "The max number of versions the store should preserve. Venice will use cluster-level config if the number is 0 here.", + "type": "int", + "default": 0 + }, + { + "name": "incrementalPushEnabled", + "doc": "a flag to see if the store supports incremental push or not", + "type": "boolean", + "default": false + }, + { + "name": "separateRealTimeTopicEnabled", + "doc": "Flag to see if the store supports separate real-time topic for incremental push.", + "type": "boolean", + "default": false + }, + { + "name": "isMigrating", + "doc": "Whether or not the store is in the process of migration", + "type": "boolean", + "default": false + }, + { + "name": "writeComputationEnabled", + "doc": "Whether write-path computation feature is enabled for this store", + "type": "boolean", + "default": false + }, + { + "name": "replicationMetadataVersionID", + "doc": "RMD (Replication metadata) version ID on the store-level. Default -1 means NOT_SET and the cluster-level RMD version ID should be used for stores.", + "type": "int", + "default": -1 + }, + { + "name": "readComputationEnabled", + "doc": "Whether read-path computation feature is enabled for this store", + "type": "boolean", + "default": false + }, + { + "name": "bootstrapToOnlineTimeoutInHours", + "doc": "Maximum number of hours allowed for the store to transition from bootstrap to online state", + "type": "int", + "default": 24 + }, + { + "name": "leaderFollowerModelEnabled", + "doc": "Whether or not to use leader follower state transition model for upcoming version", + "type": "boolean", + "default": false + }, + { + "name": "backupStrategy", + "doc": "Strategies to store backup versions.", + "type": "int", + "default": 0 + }, + { + "name": "clientDecompressionEnabled", + "type": "boolean", + "default": true + }, + { + "name": "schemaAutoRegisterFromPushJobEnabled", + "type": "boolean", + "default": false + }, + { + "name": "hybridStoreOverheadBypass", + "type": "boolean", + "default": false + }, + { + "name": "hybridStoreDiskQuotaEnabled", + "doc": "Whether or not to enable disk storage quota for a hybrid store", + "type": "boolean", + "default": false + }, + { + "name": "ETLStoreConfig", + "type": [ + "null", + { + "name": "ETLStoreConfigRecord", + "type": "record", + "fields": [ + { + "name": "etledUserProxyAccount", + "type": ["null", "string"] + }, + { + "name": "regularVersionETLEnabled", + "type": "boolean" + }, + { + "name": "futureVersionETLEnabled", + "type": "boolean" + } + ] + } + ], + "default": null + }, + { + "name": "partitionerConfig", + "type": [ + "null", + { + "name": "PartitionerConfigRecord", + "type": "record", + "fields": [ + { + "name": "partitionerClass", + "type": "string" + }, + { + "name": "partitionerParams", + "type": { + "type": "map", + "values": "string" + } + }, + { + "name": "amplificationFactor", + "type": "int" + } + ] + } + ], + "default": null + }, + { + "name": "nativeReplicationEnabled", + "type": "boolean", + "default": false + }, + { + "name": "pushStreamSourceAddress", + "type": ["null", "string"], + "default": null + }, + { + "name": "largestUsedVersionNumber", + "type": ["null", "int"], + "default": null + }, + { + "name": "incrementalPushPolicy", + "doc": "Incremental Push Policy to reconcile with real time pushes. Using int because Avro Enums are not evolvable 0 => PUSH_TO_VERSION_TOPIC, 1 => INCREMENTAL_PUSH_SAME_AS_REAL_TIME", + "type": "int", + "default": 0 + }, + { + "name": "backupVersionRetentionMs", + "type": "long", + "doc": "Backup version retention time after a new version is promoted to the current version, if not specified, Venice will use the configured retention as the default policy", + "default": -1 + }, + { + "name": "replicationFactor", + "doc": "number of replica each store version will have", + "type": "int", + "default": 3 + }, + { + "name": "migrationDuplicateStore", + "doc": "Whether or not the store is a duplicate store in the process of migration", + "type": "boolean", + "default": false + }, + { + "name": "nativeReplicationSourceFabric", + "doc": "The source fabric to be used when the store is running in Native Replication mode.", + "type": ["null", "string"], + "default": null + }, + { + "name": "activeActiveReplicationEnabled", + "doc": "A command option to enable/disable Active/Active replication feature for a store", + "type": "boolean", + "default": false + }, + { + "name": "disableMetaStore", + "doc": "An UpdateStore command option to disable the companion meta system store", + "type": "boolean", + "default": false + }, + { + "name": "disableDavinciPushStatusStore", + "doc": "An UpdateStore command option to disable the companion davinci push status store", + "type": "boolean", + "default": false + }, + { + "name": "applyTargetVersionFilterForIncPush", + "doc": "An UpdateStore command option to enable/disable applying the target version filter for incremental pushes", + "type": "boolean", + "default": false + }, + { + "name": "updatedConfigsList", + "doc": "The list that contains all updated configs by the UpdateStore command. Most of the fields in UpdateStore are not optional, and changing those fields to Optional (Union) is not a backward compatible change, so we have to add an addition array field to record all updated configs in parent controller.", + "type": { + "type": "array", + "items": "string" + }, + "default": [] + }, + { + "name": "replicateAllConfigs", + "doc": "A flag to indicate whether all store configs in parent cluster will be replicated to child clusters; true by default, so that existing UpdateStore messages in Admin topic will behave the same as before.", + "type": "boolean", + "default": true + }, + { + "name": "regionsFilter", + "doc": "A list of regions that will be impacted by the UpdateStore command", + "type": ["null", "string"], + "default": null + }, + { + "name": "storagePersona", + "doc": "The name of the StoragePersona to add to the store", + "type": ["null", "string"], + "default": null + }, + { + "name": "views", + "doc": "A map of views which describe and configure a downstream view of a venice store. Keys in this map are for convenience of managing configs.", + "type": ["null", + { + "type":"map", + "java-key-class": "java.lang.String", + "avro.java.string": "String", + "values": { + "name": "StoreViewConfigRecord", + "type": "record", + "doc": "A configuration for a particular view. This config should inform Venice leaders how to transform and transmit data to destination views.", + "fields": [ + { + "name": "viewClassName", + "type": "string", + "doc": "This informs what kind of view we are materializing. This then informs what kind of parameters are passed to parse this input. This is expected to be a fully formed class path name for materialization.", + "default": "" + }, + { + "name": "viewParameters", + "doc": "Optional parameters to be passed to the given view config.", + "type": ["null", + { + "type": "map", + "java-key-class": "java.lang.String", + "avro.java.string": "String", + "values": { "type": "string", "avro.java.string": "String" } + } + ], + "default": null + } + ] + } + }], + "default": null + }, + { + "name": "latestSuperSetValueSchemaId", + "doc": "The schema id for the latest superset schema", + "type" : "int", + "default": -1 + }, + { + "name": "storageNodeReadQuotaEnabled", + "doc": "Whether storage node read quota is enabled for this store", + "type": "boolean", + "default": false + }, + { + "name": "minCompactionLagSeconds", + "doc": "Store-level version topic min compaction lag", + "type": "long", + "default": -1 + }, + { + "name": "maxCompactionLagSeconds", + "doc": "Store-level version topic max compaction lag", + "type": "long", + "default": -1 + }, + { + "name": "maxRecordSizeBytes", + "doc": "Store-level maximum size of any record in bytes for batch push jobs", + "type": "int", + "default": -1 + }, + { + "name": "maxNearlineRecordSizeBytes", + "doc": "Store-level maximum size of any record in bytes for nearline jobs with partial updates", + "type": "int", + "default": -1 + }, + { + "name": "unusedSchemaDeletionEnabled", + "doc": "Whether unused schema deletion is enabled or not.", + "type": "boolean", + "default": false + }, + { + "name": "blobTransferEnabled", + "doc": "Flag to indicate if the blob transfer is allowed or not", + "type": "boolean", + "default": false + }, + { + "name": "nearlineProducerCompressionEnabled", + "doc": "Flag to control whether the producer in Server for nearline workload will enable compression or not", + "type": "boolean", + "default": true + }, + { + "name": "nearlineProducerCountPerWriter", + "doc": "How many producers will be used for the nearline producer in Server to improve producing throughput", + "type": "int", + "default": 1 + } + ] + }, + { + "name": "DeleteStore", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "largestUsedVersionNumber", + "type": "int" + } + ] + }, + { + "name": "DeleteOldVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "versionNum", + "type": "int" + } + ] + }, + { + "name": "MigrateStore", + "type": "record", + "fields": [ + { + "name": "srcClusterName", + "type": "string" + }, + { + "name": "destClusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "AbortMigration", + "type": "record", + "fields": [ + { + "name": "srcClusterName", + "type": "string" + }, + { + "name": "destClusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, + { + "name": "AddVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "pushJobId", + "type": "string" + }, + { + "name": "versionNum", + "type": "int" + }, + { + "name": "numberOfPartitions", + "type": "int" + }, + { + "name": "pushType", + "doc": "The push type of the new version, 0 => BATCH, 1 => STREAM_REPROCESSING. Previous add version messages will default to BATCH and this is a safe because they were created when BATCH was the only version type", + "type": "int", + "default": 0 + }, + { + "name": "pushStreamSourceAddress", + "type": ["null", "string"], + "default": null + }, + { + "name": "rewindTimeInSecondsOverride", + "doc": "The overridable rewind time config for this specific version of a hybrid store, and if it is not specified, the new version will use the store-level rewind time config", + "type": "long", + "default": -1 + }, + { + "name": "timestampMetadataVersionId", + "doc": "The A/A metadata schema version ID that will be used to deserialize metadataPayload.", + "type": "int", + "default": -1 + }, + { + "name": "versionSwapDeferred", + "doc": "Indicates if swapping this version to current version after push completion should be initiated or not", + "type": "boolean", + "default": false + }, + { + "name": "targetedRegions", + "doc": "The list of regions that is separated by comma for targeted region push. If set, this admin message should only be consumed by the targeted regions", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "default": null + }, + { + "name": "repushSourceVersion", + "doc": "Indicates the source version from which a repush version is created", + "type": "int", + "default": -1 + } + ] + }, + { + "name": "DerivedSchemaCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "schema", + "type": "SchemaMeta" + }, + { + "name": "valueSchemaId", + "type": "int" + }, + { + "name": "derivedSchemaId", + "type": "int" + } + ] + }, + { + "name": "SupersetSchemaCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "valueSchema", + "type": "SchemaMeta" + }, + { + "name": "valueSchemaId", + "type": "int" + }, + { + "name": "supersetSchema", + "type": "SchemaMeta" + }, + { + "name": "supersetSchemaId", + "type": "int" + } + ] + }, + { + "name": "ConfigureNativeReplicationForCluster", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeType", + "type": "string" + }, + { + "name": "enabled", + "type": "boolean" + }, + { + "name": "nativeReplicationSourceRegion", + "doc": "The source region to be used when the store is running in Native Replication mode.", + "type": ["null", "string"], + "default": null + }, + { + "name": "regionsFilter", + "type": ["null", "string"], + "default": null + } + ] + }, + { + "name": "MetadataSchemaCreation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "valueSchemaId", + "type": "int" + }, + { + "name": "metadataSchema", + "type": "SchemaMeta" + }, + { + "name": "timestampMetadataVersionId", + "type": "int", + "aliases": ["metadataVersionId"], + "default": -1 + } + ] + }, + { + "name": "ConfigureActiveActiveReplicationForCluster", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeType", + "type": "string" + }, + { + "name": "enabled", + "type": "boolean" + }, + { + "name": "regionsFilter", + "type": ["null", "string"], + "default": null + } + ] + }, { + "name": "ConfigureIncrementalPushForCluster", + "doc": "A command to migrate all incremental push stores in a cluster to a specific incremental push policy.", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "incrementalPushPolicyToFilter", + "doc": "If this batch update command is trying to configure existing incremental push store type, their incremental push policy should also match this filter before the batch update command applies any change to them. Default value is -1, meaning there is no filter.", + "type": "int", + "default": -1 + }, + { + "name": "incrementalPushPolicyToApply", + "doc": "This field will determine what incremental push policy will be applied to the selected stores. Default value is 1, which is the INCREMENTAL_PUSH_SAME_AS_REAL_TIME policy", + "type": "int", + "default": 1 + }, + { + "name": "regionsFilter", + "type": ["null", "string"], + "default": null + } + ] + }, { + "name": "MetaSystemStoreAutoCreationValidation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, { + "name": "PushStatusSystemStoreAutoCreationValidation", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + } + ] + }, { + "name": "CreateStoragePersona", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "name", + "type": "string" + }, + { + "name": "quotaNumber", + "type": "long" + }, + { + "name": "storesToEnforce", + "type": { + "type": "array", + "items": "string", + "default": [] + } + }, + { + "name": "owners", + "type": { + "type": "array", + "items": "string", + "default": [] + } + } + ] + }, { + "name": "DeleteStoragePersona", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "name", + "type": "string" + } + ] + }, { + "name": "UpdateStoragePersona", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, { + "name": "name", + "type": "string" + }, { + "name": "quotaNumber", + "type": ["null","long"], + "default": null + }, { + "name": "storesToEnforce", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "default": null + }, { + "name": "owners", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "default": null + } + ] + }, + { + "name": "DeleteUnusedValueSchemas", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "schemaIds", + "type": { + "type": "array", + "items": "int", + "default": [] + } + } + ] + }, + { + "name": "RollbackCurrentVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "regionsFilter", + "doc": "A list of regions that will be impacted by the RollbackCurrentVersion command", + "type": ["null", "string"], + "default": null + } + ] + }, + { + "name": "RollForwardCurrentVersion", + "type": "record", + "fields": [ + { + "name": "clusterName", + "type": "string" + }, + { + "name": "storeName", + "type": "string" + }, + { + "name": "regionsFilter", + "doc": "A list of regions that will be impacted by the RollForwardCurrentVersion command", + "type": ["null", "string"], + "default": null + } + ] + } + ] + } + ] +} diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java index 9275758a4c..230c41db31 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/TestVeniceHelixAdminWithoutCluster.java @@ -48,12 +48,14 @@ public void canMergeNewHybridConfigValuesToOldStore() { Optional timeLag = Optional.of(300L); Optional dataReplicationPolicy = Optional.of(DataReplicationPolicy.AGGREGATE); Optional bufferReplayPolicy = Optional.of(BufferReplayPolicy.REWIND_FROM_EOP); + Optional realTimeTopicName = Optional.of("storeName_rt"); HybridStoreConfig hybridStoreConfig = VeniceHelixAdmin.mergeNewSettingsIntoOldHybridStoreConfig( store, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); Assert.assertNull( hybridStoreConfig, @@ -65,7 +67,8 @@ public void canMergeNewHybridConfigValuesToOldStore() { lagOffset, timeLag, dataReplicationPolicy, - bufferReplayPolicy); + bufferReplayPolicy, + realTimeTopicName); Assert.assertNotNull(hybridStoreConfig, "specifying rewind and lagOffset should generate a valid hybrid config"); Assert.assertEquals(hybridStoreConfig.getRewindTimeInSeconds(), 123L); Assert.assertEquals(hybridStoreConfig.getOffsetLagThresholdToGoOnline(), 1500L); @@ -79,6 +82,7 @@ public void canMergeNewHybridConfigValuesToOldStore() { lagOffset, Optional.empty(), Optional.empty(), + Optional.empty(), Optional.empty()); Assert.assertNotNull(hybridStoreConfig, "specifying rewind and lagOffset should generate a valid hybrid config"); Assert.assertEquals(hybridStoreConfig.getRewindTimeInSeconds(), 123L); diff --git a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java index 70a8200fb0..be38cb0311 100644 --- a/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java +++ b/services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.java @@ -18,6 +18,7 @@ import static com.linkedin.venice.controllerapi.ControllerApiConstants.TIME_LAG_TO_GO_ONLINE; import static com.linkedin.venice.controllerapi.ControllerApiConstants.VERSION; import static com.linkedin.venice.controllerapi.ControllerApiConstants.WRITE_COMPUTATION_ENABLED; +import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REAL_TIME_TOPIC_NAME; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.anyDouble; @@ -931,6 +932,7 @@ public void testSetStore(boolean replicateAllConfigs) throws Exception { hybridConfig.offsetLagThresholdToGoOnline = 1000L; hybridConfig.producerTimestampLagThresholdToGoOnlineInSeconds = 300L; hybridConfig.dataReplicationPolicy = DataReplicationPolicy.AGGREGATE.getValue(); + hybridConfig.realTimeTopicName = DEFAULT_REAL_TIME_TOPIC_NAME; setStore.hybridStoreConfig = hybridConfig; ETLStoreConfigRecord etlStoreConfig = new ETLStoreConfigRecord();