From d39fee97faa05949f2ced075c44ddb297f3f3715 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Wed, 27 Nov 2024 17:05:13 -0800 Subject: [PATCH] New constructor for NonFateSharingCache --- .../apache/spark/util/NonFateSharingCache.scala | 16 +++++++++++++++- .../state/RocksDBStateStoreProvider.scala | 17 ++++++++--------- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala b/core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala index 21184d70b386a..7d01facc1e421 100644 --- a/core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala +++ b/core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala @@ -17,7 +17,7 @@ package org.apache.spark.util -import java.util.concurrent.Callable +import java.util.concurrent.{Callable, TimeUnit} import com.google.common.cache.{Cache, CacheBuilder, CacheLoader, LoadingCache} @@ -68,6 +68,20 @@ private[spark] object NonFateSharingCache { override def load(k: K): V = loadingFunc.apply(k) })) } + + def apply[K, V]( + maximumSize: Long, + expireAfterAccessTime: Long, + expireAfterAccessTimeUnit: TimeUnit): NonFateSharingCache[K, V] = { + val builder = CacheBuilder.newBuilder().asInstanceOf[CacheBuilder[K, V]] + if (maximumSize > 0L) { + builder.maximumSize(maximumSize) + } + if(expireAfterAccessTime > 0) { + builder.expireAfterAccess(expireAfterAccessTime, expireAfterAccessTimeUnit) + } + new NonFateSharingCache(builder.build[K, V]()) + } } private[spark] class NonFateSharingCache[K, V](protected val cache: Cache[K, V]) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index e5a4175aeec1a..c9c987fa1620d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -23,7 +23,6 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.util.control.NonFatal -import com.google.common.cache.CacheBuilder import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -613,15 +612,15 @@ object RocksDBStateStoreProvider { val VIRTUAL_COL_FAMILY_PREFIX_BYTES = 2 private val MAX_AVRO_ENCODERS_IN_CACHE = 1000 - // Add the cache at companion object level so it persists across provider instances - private val avroEncoderMap: NonFateSharingCache[String, AvroEncoder] = { - val guavaCache = CacheBuilder.newBuilder() - .maximumSize(MAX_AVRO_ENCODERS_IN_CACHE) // Adjust size based on your needs - .expireAfterAccess(1, TimeUnit.HOURS) // Optional: Add expiration if needed - .build[String, AvroEncoder]() + private val AVRO_ENCODER_LIFETIME_HOURS = 1L - new NonFateSharingCache(guavaCache) - } + // Add the cache at companion object level so it persists across provider instances + private val avroEncoderMap: NonFateSharingCache[String, AvroEncoder] = + NonFateSharingCache( + maximumSize = MAX_AVRO_ENCODERS_IN_CACHE, + expireAfterAccessTime = AVRO_ENCODER_LIFETIME_HOURS, + expireAfterAccessTimeUnit = TimeUnit.HOURS + ) def getAvroEnc( stateStoreEncoding: String,