From 020f09995aefce883cae69eed5a8bb0bd698e2c6 Mon Sep 17 00:00:00 2001 From: Felix GV Date: Thu, 14 Nov 2024 16:24:28 -0500 Subject: [PATCH] [dvc][server][samza] Heap size estimation improvement (#1281) Introduced two new utilities to make our on-heap memory usage assessment more accurate, and easier to maintain as class hierarchies evolve: - ClassSizeEstimator: Predicts (based on assumptions about how the JVM's memory layout works) the shallow size of a class. This includes the object header, all primitive fields, and all references to other objects (but it does not count these other objects, hence the shallowness). Reflection is used in this class. - InstanceSizeEstimator: Predicts the size of instances of a limited number of classes. This is not a general-purpose utility, and it requires some manual effort to onboard a new class. Reflection is not used in this class. The general design goals are the following: - Reflection should only be used once per class per runtime, and the result of this logic should be stored in static constants. - On the hot path, there should be no reflection, and we should leverage our knowledge of the Venice code base to determine which objects are meant to be counted or not. For example, singleton or otherwise shared instances should not be counted, since their amortized cost is negligible (besides the size of the pointer to refer to them). The above utilities have been integrated in all classes that implement the Measurable interface, and several new classes have been given this interface as well. The Measurable::getSize function has been renamed getHeapSize, to minimize the chance that it could clash with other function names, and to make it extra clear what kind of size is meant. Miscellaneous: - Minor efficiency improvements to PubSubMessageHeaders and ApacheKafkaUtils so that empty headers (a common case) carry less overhead. Also made the PubSubMessageHeaders implement Iterable. - Created a DefaultLeaderMetadata static class in VeniceWriter, so that a shared instance can be leveraged in cases where that object is always the same (e.g. when producing to the RT topic). - BlobSnapshotManagerTest improvements: - Added timeouts to all tests. - Fixed a race condition in testMultipleThreads. --- build.gradle | 3 +- .../ImmutableChangeCapturePubSubMessage.java | 5 + .../LeaderFollowerStoreIngestionTask.java | 2 +- .../consumer/LeaderProducedRecordContext.java | 29 +- .../kafka/consumer/StoreBufferService.java | 117 ++++-- .../blobtransfer/BlobSnapshotManagerTest.java | 20 +- .../VeniceChangelogConsumerImplTest.java | 6 +- .../consumer/StoreBufferServiceTest.java | 2 +- .../venice/TestAdminToolConsumption.java | 2 +- gradle/spotbugs/exclude.xml | 31 ++ .../linkedin/venice/common/Measurable.java | 5 - .../protocol/enums/ControlMessageType.java | 56 +-- .../venice/memory/ClassSizeEstimator.java | 250 +++++++++++ .../venice/memory/InstanceSizeEstimator.java | 180 ++++++++ .../linkedin/venice/memory/Measurable.java | 5 + .../com/linkedin/venice/message/KafkaKey.java | 16 +- .../venice/pubsub/ImmutablePubSubMessage.java | 10 + .../adapter/kafka/ApacheKafkaUtils.java | 11 +- .../pubsub/api/EmptyPubSubMessageHeaders.java | 10 + .../venice/pubsub/api/PubSubMessage.java | 5 +- .../pubsub/api/PubSubMessageDeserializer.java | 2 +- .../pubsub/api/PubSubMessageHeader.java | 4 +- .../pubsub/api/PubSubMessageHeaders.java | 36 +- ...SubProducerAdapterConcurrentDelegator.java | 22 +- .../linkedin/venice/utils/ProtocolUtils.java | 36 -- .../collections/MeasurableLinkedHashMap.java | 107 +++++ .../collections/MemoryBoundBlockingQueue.java | 19 +- .../writer/CompletableFutureCallback.java | 20 +- .../SendMessageErrorLoggerCallback.java | 15 +- .../linkedin/venice/writer/VeniceWriter.java | 34 +- .../kafka/consumer/SBSQueueNodeFactory.java | 46 ++ .../venice/memory/ClassSizeEstimatorTest.java | 396 ++++++++++++++++++ .../venice/memory/HeapSizeEstimatorTest.java | 292 +++++++++++++ .../memory/InstanceSizeEstimatorTest.java | 152 +++++++ .../PubSubProducerCallbackSimpleImpl.java | 0 .../adapter/kafka/ApacheKafkaUtilsTest.java | 24 +- ...roducerAdapterConcurrentDelegatorTest.java | 2 +- .../venice/utils/DictionaryUtilsTest.java | 6 +- .../MemoryBoundBlockingQueueTest.java | 12 +- .../linkedin/venice/utils/PubSubHelper.java | 5 + 40 files changed, 1810 insertions(+), 185 deletions(-) delete mode 100644 internal/venice-common/src/main/java/com/linkedin/venice/common/Measurable.java create mode 100644 internal/venice-common/src/main/java/com/linkedin/venice/memory/ClassSizeEstimator.java create mode 100644 internal/venice-common/src/main/java/com/linkedin/venice/memory/InstanceSizeEstimator.java create mode 100644 internal/venice-common/src/main/java/com/linkedin/venice/memory/Measurable.java delete mode 100644 internal/venice-common/src/main/java/com/linkedin/venice/utils/ProtocolUtils.java create mode 100644 internal/venice-common/src/main/java/com/linkedin/venice/utils/collections/MeasurableLinkedHashMap.java create mode 100644 internal/venice-common/src/test/java/com/linkedin/davinci/kafka/consumer/SBSQueueNodeFactory.java create mode 100644 internal/venice-common/src/test/java/com/linkedin/venice/memory/ClassSizeEstimatorTest.java create mode 100644 internal/venice-common/src/test/java/com/linkedin/venice/memory/HeapSizeEstimatorTest.java create mode 100644 internal/venice-common/src/test/java/com/linkedin/venice/memory/InstanceSizeEstimatorTest.java rename internal/venice-common/src/{main => test}/java/com/linkedin/venice/pubsub/adapter/PubSubProducerCallbackSimpleImpl.java (100%) diff --git a/build.gradle b/build.gradle index 9f4fa4e3fa..5c91d3b508 100644 --- a/build.gradle +++ b/build.gradle @@ -369,7 +369,8 @@ subprojects { } classes = classDirs.asFileTree.matching { exclude generatedClasses } auxClassPaths += classDirs.asFileTree.matching { include generatedClasses }.each { - println "Excluding generated class ${project.relativePath(it)}" + // Muted to reduce noise, but can be uncommented to debug exclusions + // println "Excluding generated class ${project.relativePath(it)}" } } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ImmutableChangeCapturePubSubMessage.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ImmutableChangeCapturePubSubMessage.java index e6122df533..75bed9b6e9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ImmutableChangeCapturePubSubMessage.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ImmutableChangeCapturePubSubMessage.java @@ -75,4 +75,9 @@ public String toString() { return "PubSubMessage{" + topicPartition + ", offset=" + offset + ", timestamp=" + timestamp + ", isEndOfBootstrap=" + isEndOfBootstrap + '}'; } + + @Override + public int getHeapSize() { + throw new UnsupportedOperationException("getHeapSize is not supported on " + this.getClass().getSimpleName()); + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 76042645a2..a09a201c58 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -2154,7 +2154,7 @@ protected void getAndUpdateLeaderCompletedState( && Arrays.equals(kafkaKey.getKey(), KafkaKey.HEART_BEAT.getKey())) { LeaderCompleteState oldState = partitionConsumptionState.getLeaderCompleteState(); LeaderCompleteState newState = oldState; - for (PubSubMessageHeader header: pubSubMessageHeaders.toList()) { + for (PubSubMessageHeader header: pubSubMessageHeaders) { if (header.key().equals(VENICE_LEADER_COMPLETION_STATE_HEADER)) { newState = LeaderCompleteState.valueOf(header.value()[0]); partitionConsumptionState diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducedRecordContext.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducedRecordContext.java index 4cff79e933..75ce762780 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducedRecordContext.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducedRecordContext.java @@ -3,11 +3,14 @@ import static com.linkedin.venice.kafka.protocol.enums.MessageType.CONTROL_MESSAGE; import static com.linkedin.venice.kafka.protocol.enums.MessageType.DELETE; import static com.linkedin.venice.kafka.protocol.enums.MessageType.PUT; +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; +import static com.linkedin.venice.memory.InstanceSizeEstimator.getSize; import com.linkedin.venice.kafka.protocol.ControlMessage; import com.linkedin.venice.kafka.protocol.Delete; import com.linkedin.venice.kafka.protocol.Put; import com.linkedin.venice.kafka.protocol.enums.MessageType; +import com.linkedin.venice.memory.Measurable; import java.util.concurrent.CompletableFuture; @@ -25,7 +28,9 @@ * drainer thread completes the persistedToDBFuture. */ -public class LeaderProducedRecordContext { +public class LeaderProducedRecordContext implements Measurable { + private static final int PARTIAL_CLASS_OVERHEAD = + getClassOverhead(LeaderProducedRecordContext.class) + getClassOverhead(CompletableFuture.class); private static final int NO_UPSTREAM = -1; /** * Kafka cluster ID where the source kafka consumer record was consumed from. @@ -235,4 +240,26 @@ private static void checkConsumedOffsetParam(long consumedOffset) { throw new IllegalArgumentException("consumedOffset cannot be negative"); } } + + @Override + public int getHeapSize() { + int size = PARTIAL_CLASS_OVERHEAD + getSize(this.keyBytes); + switch (this.messageType) { + case PUT: + size += getSize((Put) this.valueUnion); + break; + case CONTROL_MESSAGE: + size += getSize((ControlMessage) this.valueUnion); + break; + default: + /** + * Only the above two cases contribute any size. + * + * {@link DELETE} contributes nothing, and {@link com.linkedin.venice.kafka.protocol.enums.MessageType.UPDATE} + * should never happen. + */ + break; + } + return size; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreBufferService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreBufferService.java index 5e26b3a235..8f47975866 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreBufferService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreBufferService.java @@ -1,16 +1,17 @@ package com.linkedin.davinci.kafka.consumer; -import static com.linkedin.venice.utils.ProtocolUtils.getEstimateOfMessageEnvelopeSizeOnHeap; +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; import static java.util.Collections.reverseOrder; import static java.util.Comparator.comparing; import static java.util.stream.Collectors.toList; import com.linkedin.davinci.stats.StoreBufferServiceStats; import com.linkedin.davinci.utils.LockAssistedCompletableFuture; -import com.linkedin.venice.common.Measurable; import com.linkedin.venice.exceptions.VeniceChecksumException; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.memory.ClassSizeEstimator; +import com.linkedin.venice.memory.Measurable; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.pubsub.api.PubSubMessage; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; @@ -71,42 +72,53 @@ public StoreBufferService( boolean queueLeaderWrites, MetricsRepository metricsRepository, boolean sorted) { - this.drainerNum = drainerNum; - this.blockingQueueArr = new ArrayList<>(); - this.bufferCapacityPerDrainer = bufferCapacityPerDrainer; - for (int cur = 0; cur < drainerNum; ++cur) { - this.blockingQueueArr.add(new MemoryBoundBlockingQueue<>(bufferCapacityPerDrainer, bufferNotifyDelta)); - } - this.isSorted = sorted; - this.leaderRecordHandler = queueLeaderWrites ? this::queueLeaderRecord : StoreBufferService::processRecord; - String metricNamePrefix = sorted ? "StoreBufferServiceSorted" : "StoreBufferServiceUnsorted"; - this.storeBufferServiceStats = new StoreBufferServiceStats( - metricsRepository, - metricNamePrefix, - this::getTotalMemoryUsage, - this::getTotalRemainingMemory, - this::getMaxMemoryUsagePerDrainer, - this::getMinMemoryUsagePerDrainer); + this(drainerNum, bufferCapacityPerDrainer, bufferNotifyDelta, queueLeaderWrites, null, metricsRepository, sorted); } /** - * Constructor for testing + * Package-private constructor for testing */ - public StoreBufferService( + StoreBufferService( int drainerNum, long bufferCapacityPerDrainer, long bufferNotifyDelta, boolean queueLeaderWrites, StoreBufferServiceStats stats) { + this(drainerNum, bufferCapacityPerDrainer, bufferNotifyDelta, queueLeaderWrites, stats, null, true); + } + + /** + * Shared code for the main and test constructors. + * + * N.B.: Either {@param stats} or {@param metricsRepository} should be null, but not both. If neither are null, then + * we default to the main code's expected path, meaning that the metric repo will be used to construct a + * {@link StoreBufferServiceStats} instance, and the passed in stats object will be ignored. + */ + private StoreBufferService( + int drainerNum, + long bufferCapacityPerDrainer, + long bufferNotifyDelta, + boolean queueLeaderWrites, + StoreBufferServiceStats stats, + MetricsRepository metricsRepository, + boolean sorted) { this.drainerNum = drainerNum; this.blockingQueueArr = new ArrayList<>(); this.bufferCapacityPerDrainer = bufferCapacityPerDrainer; for (int cur = 0; cur < drainerNum; ++cur) { this.blockingQueueArr.add(new MemoryBoundBlockingQueue<>(bufferCapacityPerDrainer, bufferNotifyDelta)); } + this.isSorted = sorted; this.leaderRecordHandler = queueLeaderWrites ? this::queueLeaderRecord : StoreBufferService::processRecord; - this.storeBufferServiceStats = stats; - this.isSorted = true; + this.storeBufferServiceStats = metricsRepository == null + ? Objects.requireNonNull(stats) + : new StoreBufferServiceStats( + Objects.requireNonNull(metricsRepository), + sorted ? "StoreBufferServiceSorted" : "StoreBufferServiceUnsorted", + this::getTotalMemoryUsage, + this::getTotalRemainingMemory, + this::getMaxMemoryUsagePerDrainer, + this::getMinMemoryUsagePerDrainer); } protected MemoryBoundBlockingQueue getDrainerForConsumerRecord( @@ -378,11 +390,8 @@ public long getMinMemoryUsagePerDrainer() { /** * Queue node type in {@link BlockingQueue} of each drainer thread. */ - private static class QueueNode implements Measurable { - /** - * Considering the overhead of {@link PubSubMessage} and its internal structures. - */ - private static final int QUEUE_NODE_OVERHEAD_IN_BYTE = 256; + static class QueueNode implements Measurable { + private static final int SHALLOW_CLASS_OVERHEAD = ClassSizeEstimator.getClassOverhead(QueueNode.class); private final PubSubMessage consumerRecord; private final StoreIngestionTask ingestionTask; private final String kafkaUrl; @@ -447,15 +456,14 @@ public int hashCode() { return consumerRecord.hashCode(); } + protected int getBaseClassOverhead() { + return SHALLOW_CLASS_OVERHEAD; + } + @Override - public int getSize() { - // For FakePubSubMessage, the key and the value are null. - if (consumerRecord instanceof FakePubSubMessage) { - return QUEUE_NODE_OVERHEAD_IN_BYTE; - } - // N.B.: This is just an estimate. TODO: Consider if it is really useful, and whether to get rid of it. - return this.consumerRecord.getKey().getEstimatedObjectSizeOnHeap() - + getEstimateOfMessageEnvelopeSizeOnHeap(this.consumerRecord.getValue()) + QUEUE_NODE_OVERHEAD_IN_BYTE; + public int getHeapSize() { + /** The other non-primitive fields point to shared instances and are therefore ignored. */ + return getBaseClassOverhead() + consumerRecord.getHeapSize(); } @Override @@ -465,6 +473,13 @@ public String toString() { } private static class FollowerQueueNode extends QueueNode { + /** + * N.B.: We don't want to recurse fully into the {@link CompletableFuture}, but we do want to take into account an + * "empty" one. + */ + private static final int PARTIAL_CLASS_OVERHEAD = + getClassOverhead(FollowerQueueNode.class) + getClassOverhead(CompletableFuture.class); + private final CompletableFuture queuedRecordPersistedFuture; public FollowerQueueNode( @@ -491,9 +506,16 @@ public int hashCode() { public boolean equals(Object o) { return super.equals(o); } + + @Override + protected int getBaseClassOverhead() { + return PARTIAL_CLASS_OVERHEAD; + } } - private static class LeaderQueueNode extends QueueNode { + static class LeaderQueueNode extends QueueNode { + private static final int SHALLOW_CLASS_OVERHEAD = ClassSizeEstimator.getClassOverhead(LeaderQueueNode.class); + private final LeaderProducedRecordContext leaderProducedRecordContext; public LeaderQueueNode( @@ -520,9 +542,21 @@ public int hashCode() { public boolean equals(Object o) { return super.equals(o); } + + @Override + protected int getBaseClassOverhead() { + return SHALLOW_CLASS_OVERHEAD + leaderProducedRecordContext.getHeapSize(); + } } private static class CommandQueueNode extends QueueNode { + /** + * N.B.: We don't want to recurse fully into the {@link CompletableFuture}, but we do want to take into account an + * "empty" one. + */ + private static final int PARTIAL_CLASS_OVERHEAD = + getClassOverhead(CommandQueueNode.class) + getClassOverhead(LockAssistedCompletableFuture.class); + enum CommandType { // only supports SYNC_OFFSET command today. SYNC_OFFSET @@ -589,6 +623,10 @@ public int hashCode() { public boolean equals(Object o) { return super.equals(o); } + + protected int getBaseClassOverhead() { + return PARTIAL_CLASS_OVERHEAD; + } } /** @@ -714,6 +752,7 @@ public void run() { } private static class FakePubSubMessage implements PubSubMessage { + private static final int SHALLOW_CLASS_OVERHEAD = ClassSizeEstimator.getClassOverhead(FakePubSubMessage.class); private final PubSubTopicPartition topicPartition; FakePubSubMessage(PubSubTopicPartition topicPartition) { @@ -754,5 +793,11 @@ public int getPayloadSize() { public boolean isEndOfBootstrap() { return false; } + + @Override + public int getHeapSize() { + /** We assume that {@link #topicPartition} is a singleton instance, and therefore we're not counting it. */ + return SHALLOW_CLASS_OVERHEAD; + } } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManagerTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManagerTest.java index 08ebe35dd4..b30cdd2f4c 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManagerTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManagerTest.java @@ -10,6 +10,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertTrue; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; @@ -19,12 +20,14 @@ import com.linkedin.venice.meta.ReadOnlyStoreRepository; import com.linkedin.venice.meta.Store; import com.linkedin.venice.store.rocksdb.RocksDBUtils; +import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import java.io.File; import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.mockito.MockedStatic; import org.mockito.Mockito; @@ -36,6 +39,7 @@ public class BlobSnapshotManagerTest { + private static final int TIMEOUT = 30 * Time.MS_PER_SECOND; private static final String STORE_NAME = "test-store"; private static final int VERSION_ID = 1; private static final String TOPIC_NAME = STORE_NAME + "_v" + VERSION_ID; @@ -51,7 +55,7 @@ public class BlobSnapshotManagerTest { private static final BlobTransferPayload blobTransferPayload = new BlobTransferPayload(BASE_PATH, STORE_NAME, VERSION_ID, PARTITION_ID); - @Test + @Test(timeOut = TIMEOUT) public void testHybridSnapshot() { AbstractStorageEngine storageEngine = Mockito.mock(AbstractStorageEngine.class); Mockito.doReturn(storageEngine).when(storageEngineRepository).getLocalStorageEngine(TOPIC_NAME); @@ -76,7 +80,7 @@ public void testHybridSnapshot() { Assert.assertEquals(actualBlobTransferPartitionMetadata, blobTransferPartitionMetadata); } - @Test + @Test(timeOut = TIMEOUT) public void testSameSnapshotWhenConcurrentUsersNotExceedMaxAllowedUsers() { Store mockStore = mock(Store.class); @@ -105,7 +109,7 @@ public void testSameSnapshotWhenConcurrentUsersNotExceedMaxAllowedUsers() { Assert.assertEquals(actualBlobTransferPartitionMetadata, blobTransferPartitionMetadata); } - @Test + @Test(timeOut = TIMEOUT) public void testSameSnapshotWhenConcurrentUsersExceedsMaxAllowedUsers() { Store mockStore = mock(Store.class); @@ -145,7 +149,7 @@ public void testSameSnapshotWhenConcurrentUsersExceedsMaxAllowedUsers() { BlobSnapshotManager.DEFAULT_MAX_CONCURRENT_USERS); } - @Test + @Test(timeOut = TIMEOUT) public void testTwoRequestUsingSameOffset() { // Prepare Store mockStore = mock(Store.class); @@ -180,8 +184,8 @@ public void testTwoRequestUsingSameOffset() { blobTransferPartitionMetadata); } - @Test - public void testMultipleThreads() { + @Test(timeOut = TIMEOUT) + public void testMultipleThreads() throws InterruptedException { final int numberOfThreads = 2; final ExecutorService asyncExecutor = Executors.newFixedThreadPool(numberOfThreads); final CountDownLatch latch = new CountDownLatch(numberOfThreads); @@ -218,10 +222,12 @@ public void testMultipleThreads() { Assert.assertEquals(e.getMessage(), errorMessage); } + assertTrue(latch.await(TIMEOUT / 2, TimeUnit.MILLISECONDS)); + Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers(TOPIC_NAME, PARTITION_ID), 0); } - @Test + @Test(timeOut = TIMEOUT) public void testCreateSnapshotForBatch() throws RocksDBException { try (MockedStatic checkpointMockedStatic = Mockito.mockStatic(Checkpoint.class)) { try (MockedStatic fileUtilsMockedStatic = Mockito.mockStatic(FileUtils.class)) { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java index 9be396e7c2..1ce9115ea8 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java @@ -537,7 +537,7 @@ private PubSubMessage constructVersionSwap PubSubTopic newTopic, int partition, List localHighWatermarks) { - KafkaKey kafkaKey = new KafkaKey(MessageType.CONTROL_MESSAGE, null); + KafkaKey kafkaKey = new KafkaKey(MessageType.CONTROL_MESSAGE, new byte[0]); VersionSwap versionSwapMessage = new VersionSwap(); versionSwapMessage.oldServingVersionTopic = oldTopic.getName(); versionSwapMessage.newServingVersionTopic = newTopic.getName(); @@ -616,7 +616,7 @@ private PubSubMessage constructEndOfPushMe PubSubTopic versionTopic, int partition, Long offset) { - KafkaKey kafkaKey = new KafkaKey(MessageType.CONTROL_MESSAGE, null); + KafkaKey kafkaKey = new KafkaKey(MessageType.CONTROL_MESSAGE, new byte[0]); EndOfPush endOfPush = new EndOfPush(); KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope(); ProducerMetadata producerMetadata = new ProducerMetadata(); @@ -633,7 +633,7 @@ private PubSubMessage constructEndOfPushMe private PubSubMessage constructStartOfPushMessage( PubSubTopic versionTopic, int partition) { - KafkaKey kafkaKey = new KafkaKey(MessageType.CONTROL_MESSAGE, null); + KafkaKey kafkaKey = new KafkaKey(MessageType.CONTROL_MESSAGE, new byte[0]); StartOfPush startOfPush = new StartOfPush(); startOfPush.compressionStrategy = CompressionStrategy.NO_OP.getValue(); KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreBufferServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreBufferServiceTest.java index d8d2f383c2..0d9bb3e4dd 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreBufferServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreBufferServiceTest.java @@ -39,7 +39,7 @@ public class StoreBufferServiceTest { private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); - private final KafkaKey key = new KafkaKey(MessageType.PUT, null); + private final KafkaKey key = new KafkaKey(MessageType.PUT, new byte[0]); private final Put put = new Put(ByteBuffer.allocate(0), 0, 0, ByteBuffer.allocate(0)); private final KafkaMessageEnvelope value = new KafkaMessageEnvelope(MessageType.PUT.getValue(), new ProducerMetadata(), put, null); diff --git a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminToolConsumption.java b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminToolConsumption.java index bdf2258950..b4c47e5568 100644 --- a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminToolConsumption.java +++ b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminToolConsumption.java @@ -156,7 +156,7 @@ public void testAdminToolConsumption() { new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, 0, 0, 20); PubSubMessage pubSubMessage2 = new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope2, pubSubTopicPartition, 1, 0, 10); - KafkaKey kafkaControlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, null); + KafkaKey kafkaControlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, new byte[0]); EndOfPush endOfPush = new EndOfPush(); KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope(); kafkaMessageEnvelope.messageType = MessageType.CONTROL_MESSAGE.getValue(); diff --git a/gradle/spotbugs/exclude.xml b/gradle/spotbugs/exclude.xml index 40d69bb0be..3725224437 100644 --- a/gradle/spotbugs/exclude.xml +++ b/gradle/spotbugs/exclude.xml @@ -55,12 +55,27 @@ + + + + + + + + + + + + + + + @@ -189,6 +204,12 @@ + + + + + + @@ -272,6 +293,12 @@ + + + + + + @@ -297,6 +324,10 @@ + + + + diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/common/Measurable.java b/internal/venice-common/src/main/java/com/linkedin/venice/common/Measurable.java deleted file mode 100644 index d76f582722..0000000000 --- a/internal/venice-common/src/main/java/com/linkedin/venice/common/Measurable.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.linkedin.venice.common; - -public interface Measurable { - int getSize(); -} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/ControlMessageType.java b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/ControlMessageType.java index 9170b59b3a..a1a3e33566 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/ControlMessageType.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/ControlMessageType.java @@ -1,19 +1,21 @@ package com.linkedin.venice.kafka.protocol.enums; -import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceMessageException; import com.linkedin.venice.kafka.protocol.ControlMessage; import com.linkedin.venice.kafka.protocol.EndOfIncrementalPush; import com.linkedin.venice.kafka.protocol.EndOfPush; import com.linkedin.venice.kafka.protocol.EndOfSegment; +import com.linkedin.venice.kafka.protocol.StartOfBufferReplay; import com.linkedin.venice.kafka.protocol.StartOfIncrementalPush; import com.linkedin.venice.kafka.protocol.StartOfPush; import com.linkedin.venice.kafka.protocol.StartOfSegment; import com.linkedin.venice.kafka.protocol.TopicSwitch; import com.linkedin.venice.kafka.protocol.VersionSwap; +import com.linkedin.venice.memory.ClassSizeEstimator; import com.linkedin.venice.utils.EnumUtils; import com.linkedin.venice.utils.VeniceEnumValue; import java.util.List; +import java.util.function.Supplier; /** @@ -24,15 +26,35 @@ * not support evolution (i.e.: adding values) properly. */ public enum ControlMessageType implements VeniceEnumValue { - START_OF_PUSH(0), END_OF_PUSH(1), START_OF_SEGMENT(2), END_OF_SEGMENT(3), @Deprecated - START_OF_BUFFER_REPLAY(4), START_OF_INCREMENTAL_PUSH(5), END_OF_INCREMENTAL_PUSH(6), TOPIC_SWITCH(7), VERSION_SWAP(8); + START_OF_PUSH(0, () -> new StartOfPush()), + + END_OF_PUSH(1, () -> new EndOfPush()), + + START_OF_SEGMENT(2, () -> new StartOfSegment()), + + END_OF_SEGMENT(3, () -> new EndOfSegment()), + + @Deprecated + START_OF_BUFFER_REPLAY(4, () -> new StartOfBufferReplay()), + + START_OF_INCREMENTAL_PUSH(5, () -> new StartOfIncrementalPush()), + + END_OF_INCREMENTAL_PUSH(6, () -> new EndOfIncrementalPush()), + + TOPIC_SWITCH(7, () -> new TopicSwitch()), + + VERSION_SWAP(8, () -> new VersionSwap()); /** The value is the byte used on the wire format */ private final int value; + private final Supplier constructor; + private final int shallowClassOverhead; private static final List TYPES = EnumUtils.getEnumValuesList(ControlMessageType.class); - ControlMessageType(int value) { + ControlMessageType(int value, Supplier constructor) { this.value = value; + this.constructor = constructor; + this.shallowClassOverhead = ClassSizeEstimator.getClassOverhead(constructor.get().getClass()); } @Override @@ -54,27 +76,7 @@ public int getValue() { * - {@link VersionSwap} */ public Object getNewInstance() { - switch (valueOf(value)) { - case START_OF_PUSH: - return new StartOfPush(); - case END_OF_PUSH: - return new EndOfPush(); - case START_OF_SEGMENT: - return new StartOfSegment(); - case END_OF_SEGMENT: - return new EndOfSegment(); - case START_OF_INCREMENTAL_PUSH: - return new StartOfIncrementalPush(); - case END_OF_INCREMENTAL_PUSH: - return new EndOfIncrementalPush(); - case TOPIC_SWITCH: - return new TopicSwitch(); - case VERSION_SWAP: - return new VersionSwap(); - - default: - throw new VeniceException("Unsupported " + getClass().getSimpleName() + " value: " + value); - } + return this.constructor.get(); } public static ControlMessageType valueOf(int value) { @@ -84,4 +86,8 @@ public static ControlMessageType valueOf(int value) { public static ControlMessageType valueOf(ControlMessage controlMessage) { return valueOf(controlMessage.controlMessageType); } + + public int getShallowClassOverhead() { + return this.shallowClassOverhead; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/memory/ClassSizeEstimator.java b/internal/venice-common/src/main/java/com/linkedin/venice/memory/ClassSizeEstimator.java new file mode 100644 index 0000000000..a03f3eea67 --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/memory/ClassSizeEstimator.java @@ -0,0 +1,250 @@ +package com.linkedin.venice.memory; + +import com.linkedin.venice.utils.Utils; +import com.sun.management.HotSpotDiagnosticMXBean; +import java.lang.management.ManagementFactory; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nonnull; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +/** + * Utility class to help in implementing {@link Measurable#getHeapSize()}. A couple of important points: + * + * 1. This utility class does not "measure" the heap size, but rather attempts to "predict" it, based on knowledge of + * the internals of the JVM. If any of the assumptions are wrong, then of course the results will be inaccurate. + * 2. This utility class assumes we are using the HotSpot JVM. + */ +public class ClassSizeEstimator { + private static final Logger LOGGER = LogManager.getLogger(ClassSizeEstimator.class); + private static final ClassValue KNOWN_SHALLOW_SIZES = new ClassValue() { + @Override + protected Integer computeValue(Class type) { + return computeClassOverhead(type); + } + }; + private static final boolean IS_64_BITS; + private static final boolean COMPRESSED_OOPS; + private static final boolean COMPRESSED_CLASS_POINTERS; + private static final int ALIGNMENT_SIZE; + private static final int OBJECT_HEADER_SIZE; + static final int ARRAY_HEADER_SIZE; + private static final int POINTER_SIZE; + private static final int JAVA_MAJOR_VERSION; + + static { + /** + * This prop name looks sketchy, but I've seen it mentioned in a couple of posts online, so I'm trusting it for + * now... In any case, if that property is not found, then we'll default to 64 bits, which is a conservative + * assumption (i.e., it would cause us to over-estimate the size on heap)... + */ + String propertyName = "sun.arch.data.model"; + String arch = System.getProperty(propertyName); + LOGGER.debug("System property {} has value: {}", propertyName, arch); + IS_64_BITS = (arch == null) || !arch.contains("32"); + COMPRESSED_OOPS = getBooleanVmOption("UseCompressedOops"); + COMPRESSED_CLASS_POINTERS = getBooleanVmOption("UseCompressedClassPointers"); + ALIGNMENT_SIZE = IS_64_BITS ? 8 : 4; // Also serves as the object header's "mark word" size + OBJECT_HEADER_SIZE = ALIGNMENT_SIZE + (COMPRESSED_CLASS_POINTERS ? 4 : 8); + /** + * The "array base" is always word-aligned, which under some circumstances (in 32 bits JVMs, or in 64 bits JVMs + * where {@link COMPRESSED_OOPS} is false, either due to a large heap or to explicit disabling) causes "internal + * space loss". + * + * See: https://shipilev.net/jvm/objects-inside-out/#_observation_array_base_is_aligned + */ + ARRAY_HEADER_SIZE = roundUpToNearest(OBJECT_HEADER_SIZE + Integer.BYTES, ALIGNMENT_SIZE); + POINTER_SIZE = COMPRESSED_OOPS ? 4 : 8; + JAVA_MAJOR_VERSION = Utils.getJavaMajorVersion(); + } + + private ClassSizeEstimator() { + // Static utility + } + + /** + * This function provides the "shallow size" of each instance of the given class, meaning that all pointers are + * considered to be null. + * + * Intended usage: This function is a helper to make it easier to implement {@link Measurable#getHeapSize()}. It + * should not be called on the hot path. Rather, it should be called at most once per JVM runtime per class of + * interest and the result should be stored in a static field. If an instance of a measured class contains only + * primitive fields or all its non-primitive fields are null, then the result of this function is effectively the heap + * size of that instance. If these conditions are not true, then a function should be implemented which uses this + * class overhead as a base and then adds the size of any referenced Objects. For example, see + * {@link InstanceSizeEstimator#getObjectSize(Object)}. + * + * @param c The {@link Class} for which to predict the "shallow overhead", as defined above. + * @return The base overhead of the class, which can be any positive number (including zero). + */ + public static int getClassOverhead(@Nonnull final Class c) { + Integer knownSize = KNOWN_SHALLOW_SIZES.get(c); + if (knownSize != null) { + return knownSize; + } + return computeClassOverhead(c); + } + + private static int computeClassOverhead(@Nonnull final Class c) { + if (c == null) { + throw new NullPointerException("The class param must not be null."); + } + + if (c.isEnum()) { + /** + * Each enum value is stored statically, and so the per-instance overhead of enum fields is only the pointer size, + * already taken into account in the field loop below. + */ + return 0; + } + + if (c.isPrimitive()) { + return getPrimitiveSize(c); + } + + int size = c.isArray() ? ARRAY_HEADER_SIZE : OBJECT_HEADER_SIZE; + + /** + * We need to measure the overhead of fields for the passed in class as well as all parents. The order in which we + * traverse these classes matters in older Java versions (prior to 15) and not in newer ones, but for the sake of + * minimizing code size, we will always iterate in the order relevant to older Java versions. + */ + List classHierarchyFromSubclassToParent = new ArrayList<>(); + classHierarchyFromSubclassToParent.add(c); + Class parentClass = c.getSuperclass(); + while (parentClass != null) { + classHierarchyFromSubclassToParent.add(parentClass); + parentClass = parentClass.getSuperclass(); + } + + // Iterate from the end to the beginning, so we go from parent to sub + for (int i = classHierarchyFromSubclassToParent.size() - 1; i >= 0; i--) { + int classFieldsOverhead = overheadOfFields(classHierarchyFromSubclassToParent.get(i)); + if (classFieldsOverhead == 0) { + continue; + } + size += classFieldsOverhead; + + if (JAVA_MAJOR_VERSION < 15) { + /** + * In older Java versions, the field layout would always order the fields from the parent class to subclass. + * Within a class, the fields could be re-ordered to optimize packing the fields within the "alignment shadow", + * but not across classes of the hierarchy. + * + * See: https://shipilev.net/jvm/objects-inside-out/#_superclass_gaps + */ + if (i > 0) { + /** + * We align for each class, except the last one, since we'll take care of it below. BUT, importantly, at this + * stage we align by pointer size, NOT by alignment size. + */ + size = roundUpToNearest(size, POINTER_SIZE); + } + } + } + + // We align once at the end no matter the Java version + size = roundUpToNearestAlignment(size); + + return size; + } + + /** Based on: https://shipilev.net/jvm/objects-inside-out/#_data_types_and_their_representation */ + private static int getPrimitiveSize(Class c) { + if (c.isPrimitive()) { + if (c.equals(boolean.class)) { + return 1; + } else if (c.equals(byte.class)) { + return Byte.BYTES; + } else if (c.equals(char.class)) { + return Character.BYTES; + } else if (c.equals(short.class)) { + return Short.BYTES; + } else if (c.equals(int.class)) { + return Integer.BYTES; + } else if (c.equals(float.class)) { + return Float.BYTES; + } else if (c.equals(long.class)) { + return Long.BYTES; + } else if (c.equals(double.class)) { + return Double.BYTES; + } + + // Defensive code + throw new IllegalStateException( + "Class " + c.getSimpleName() + + " is said to be a primitive but does not conform to any known primitive type!"); + } else { + throw new IllegalArgumentException("Class " + c.getSimpleName() + " is not a primitive!"); + } + } + + static int roundUpToNearestAlignment(int size) { + return roundUpToNearest(size, ALIGNMENT_SIZE); + } + + /** Deal with alignment by rounding up to the nearest boundary. */ + private static int roundUpToNearest(int size, int intervalSize) { + int partialAlignmentWindowUsage = size % intervalSize; + int waste = partialAlignmentWindowUsage == 0 ? 0 : intervalSize - partialAlignmentWindowUsage; + int finalSize = size + waste; + return finalSize; + } + + private static int overheadOfFields(Class c) { + int size = 0; + + /** + * {@link Class#getDeclaredFields()} returns the fields (of all visibility, from private to public and everything + * in between) of the class, but not of its parent class. + */ + for (Field f: c.getDeclaredFields()) { + if (Modifier.isStatic(f.getModifiers())) { + continue; + } + Class fieldClass = f.getType(); + if (fieldClass.isPrimitive()) { + size += KNOWN_SHALLOW_SIZES.get(fieldClass); + } else { + /** + * Only primitives are stored in-line within the object, while all non-primitives are stored elsewhere on the + * heap, with a pointer within the object to reference them. + */ + size += POINTER_SIZE; + } + } + + /** + * N.B.: The output of this function could be cached, though we're currently not doing it since the intent is not + * to call this on the hat path anyway. + */ + + return size; + } + + /** Package-private on purpose, for tests... */ + static boolean is64bitsJVM() { + return IS_64_BITS; + } + + /** Package-private on purpose, for tests... */ + static boolean isUseCompressedOopsEnabled() { + return COMPRESSED_OOPS; + } + + /** Package-private on purpose, for tests... */ + static boolean isCompressedKlassPointersEnabled() { + return COMPRESSED_CLASS_POINTERS; + } + + private static boolean getBooleanVmOption(String optionName) { + String optionValue = + ManagementFactory.getPlatformMXBean(HotSpotDiagnosticMXBean.class).getVMOption(optionName).getValue(); + LOGGER.debug("VM option {} has value: {}", optionName, optionValue); + return Boolean.parseBoolean(optionValue); + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/memory/InstanceSizeEstimator.java b/internal/venice-common/src/main/java/com/linkedin/venice/memory/InstanceSizeEstimator.java new file mode 100644 index 0000000000..0c67049ae9 --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/memory/InstanceSizeEstimator.java @@ -0,0 +1,180 @@ +package com.linkedin.venice.memory; + +import static com.linkedin.venice.memory.ClassSizeEstimator.ARRAY_HEADER_SIZE; +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; + +import com.linkedin.venice.kafka.protocol.ControlMessage; +import com.linkedin.venice.kafka.protocol.Delete; +import com.linkedin.venice.kafka.protocol.GUID; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.kafka.protocol.LeaderMetadata; +import com.linkedin.venice.kafka.protocol.ProducerMetadata; +import com.linkedin.venice.kafka.protocol.Put; +import com.linkedin.venice.kafka.protocol.Update; +import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; +import com.linkedin.venice.kafka.protocol.enums.MessageType; +import com.linkedin.venice.writer.VeniceWriter; +import java.nio.ByteBuffer; +import java.util.function.ToIntFunction; +import javax.annotation.Nonnull; + + +/** + * This utility class provides functions to measure the heap size of objects for a limited number of classes. Wherever + * possible, the logic makes use of knowledge of the Venice code so that shared or static instances are ignored (i.e., + * assuming that their amortized cost is negligible). The code here should all be hot path friendly, and make no use of + * reflection (except in static constants). + * + * If more classes require heap size measurement, the preferred approach is NOT to add code into this class! Rather, the + * preferred approach is to implement {@link Measurable} and write the logic in {@link Measurable#getHeapSize()}. This + * utility class is for cases where we wish to measure classes that we cannot modify, such as those coming from the JDK, + * from third-party libraries, or from code-gen. + */ +public class InstanceSizeEstimator { + private static final ClassValue> CACHE = new ClassValue>() { + @Override + protected ToIntFunction computeValue(Class type) { + if (Measurable.class.isAssignableFrom(type)) { + return value -> ((Measurable) value).getHeapSize(); + } else if (Put.class.isAssignableFrom(type)) { + return value -> getSize((Put) value); + } else if (Delete.class.isAssignableFrom(type)) { + return value -> getSize((Delete) value); + } else if (Update.class.isAssignableFrom(type)) { + return value -> getSize((Update) value); + } else if (ControlMessage.class.isAssignableFrom(type)) { + return value -> getSize((ControlMessage) value); + } else if (KafkaMessageEnvelope.class.isAssignableFrom(type)) { + return value -> getSize((KafkaMessageEnvelope) value); + } else if (ProducerMetadata.class.isAssignableFrom(type)) { + return value -> getSize((ProducerMetadata) value); + } else if (ByteBuffer.class.isAssignableFrom(type)) { + return value -> getSize((ByteBuffer) value); + } else if (type.isArray() && type.getComponentType().equals(byte.class)) { + return value -> getSize((byte[]) value); + } + return null; + } + }; + private static final int GUID_FULL_CLASS_OVERHEAD = + getClassOverhead(GUID.class) + getByteArraySizeByLength(GUID.getClassSchema().getFixedSize()); + private static final int PRODUCER_METADATA_FULL_CLASS_OVERHEAD = + getClassOverhead(ProducerMetadata.class) + GUID_FULL_CLASS_OVERHEAD; + private static final int KME_PARTIAL_CLASS_OVERHEAD = + getClassOverhead(KafkaMessageEnvelope.class) + PRODUCER_METADATA_FULL_CLASS_OVERHEAD; + private static final int PUT_SHALLOW_CLASS_OVERHEAD = getClassOverhead(Put.class); + private static final int UPDATE_SHALLOW_CLASS_OVERHEAD = getClassOverhead(Update.class); + private static final int DELETE_SHALLOW_CLASS_OVERHEAD = getClassOverhead(Delete.class); + private static final int CONTROL_MESSAGE_SHALLOW_CLASS_OVERHEAD = getClassOverhead(ControlMessage.class); + private static final int LEADER_METADATA_SHALLOW_CLASS_OVERHEAD = getClassOverhead(LeaderMetadata.class); + private static final int BYTE_BUFFER_SHALLOW_CLASS_OVERHEAD = getClassOverhead(ByteBuffer.class); + + private InstanceSizeEstimator() { + // Static utility + } + + /** + * Works for {@link Measurable} objects and a small number of other types. + * + * Not intended as a generic utility for any instance type! + * + * @throws IllegalArgumentException when an unsupported type is passed. + */ + public static int getObjectSize(@Nonnull Object o) { + if (o instanceof Measurable) { + return ((Measurable) o).getHeapSize(); + } + ToIntFunction sizeComputation = CACHE.get(o.getClass()); + if (sizeComputation == null) { + throw new IllegalArgumentException("Object of type " + o.getClass() + " is not measurable!"); + } + return sizeComputation.applyAsInt(o); + } + + public static int getByteArraySizeByLength(int length) { + return ClassSizeEstimator.roundUpToNearestAlignment(ARRAY_HEADER_SIZE + length); + } + + public static int getSize(@Nonnull byte[] bytes) { + return getByteArraySizeByLength(bytes.length); + } + + public static int getSize(ByteBuffer byteBuffer) { + if (byteBuffer == null) { + return 0; + } + if (byteBuffer.hasArray()) { + return BYTE_BUFFER_SHALLOW_CLASS_OVERHEAD + getByteArraySizeByLength(byteBuffer.capacity()); + } + + throw new IllegalArgumentException("Only array-backed ByteBuffers are measurable with this function."); + } + + public static int getSize(@Nonnull ProducerMetadata producerMetadata) { + return PRODUCER_METADATA_FULL_CLASS_OVERHEAD; + } + + public static int getSize(@Nonnull Put put) { + int size = PUT_SHALLOW_CLASS_OVERHEAD; + size += getSize(put.putValue); + if (put.replicationMetadataPayload != null) { + size += BYTE_BUFFER_SHALLOW_CLASS_OVERHEAD; + if (put.replicationMetadataPayload.array() != put.putValue.array()) { + /** + * N.B.: When using the {@link org.apache.avro.io.OptimizedBinaryDecoder}, the {@link put.putValue} and the + * {@link put.replicationMetadataPayload} will be backed by the same underlying array. If that is the + * case, then we don't want to account for the capacity twice. + */ + size += put.replicationMetadataPayload.capacity(); + } + } + return size; + } + + public static int getSize(@Nonnull Delete delete) { + return DELETE_SHALLOW_CLASS_OVERHEAD + getSize(delete.replicationMetadataPayload); + } + + /** + * This function is imprecise in a couple of ways. The {@link ControlMessage#controlMessageUnion} field is treated as + * shallow, which in some cases is false (e.g. if a compression dictionary were present), and the + * {@link ControlMessage#debugInfo} is ignored (i.e. treated as null). + * + * We can be more precise by looking at the {@link ControlMessage#controlMessageType} and then providing the precise + * overhead based on each type, but we're skipping this work for now since, in our use case, control messages should + * be a negligible fraction of all messages, and therefore not that important to get exactly right. + */ + public static int getSize(@Nonnull ControlMessage cm) { + return CONTROL_MESSAGE_SHALLOW_CLASS_OVERHEAD + ControlMessageType.valueOf(cm).getShallowClassOverhead(); + } + + public static int getSize(@Nonnull Update update) { + return UPDATE_SHALLOW_CLASS_OVERHEAD + getSize(update.updateValue); + } + + /** + * Measure the heap usage of {@link KafkaMessageEnvelope}. + */ + public static int getSize(KafkaMessageEnvelope kme) { + int size = KME_PARTIAL_CLASS_OVERHEAD; + switch (MessageType.valueOf(kme)) { + case PUT: + size += getSize((Put) kme.payloadUnion); + break; + case DELETE: + size += getSize((Delete) kme.payloadUnion); + break; + case CONTROL_MESSAGE: + size += getSize((ControlMessage) kme.payloadUnion); + break; + case UPDATE: + size += getSize((Update) kme.payloadUnion); + break; + } + if (kme.leaderMetadataFooter != null && !(kme.leaderMetadataFooter instanceof VeniceWriter.DefaultLeaderMetadata)) { + /** The host name part of this object should always be shared, hence we ignore it. */ + size += LEADER_METADATA_SHALLOW_CLASS_OVERHEAD; + } + return size; + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/memory/Measurable.java b/internal/venice-common/src/main/java/com/linkedin/venice/memory/Measurable.java new file mode 100644 index 0000000000..f61713173c --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/memory/Measurable.java @@ -0,0 +1,5 @@ +package com.linkedin.venice.memory; + +public interface Measurable { + int getHeapSize(); +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java b/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java index 163227f4d5..ae5971c7f0 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java @@ -3,6 +3,9 @@ import com.linkedin.venice.guid.HeartbeatGuidV3Generator; import com.linkedin.venice.kafka.protocol.GUID; import com.linkedin.venice.kafka.protocol.enums.MessageType; +import com.linkedin.venice.memory.ClassSizeEstimator; +import com.linkedin.venice.memory.InstanceSizeEstimator; +import com.linkedin.venice.memory.Measurable; import com.linkedin.venice.utils.ByteUtils; import java.nio.ByteBuffer; import javax.annotation.Nonnull; @@ -13,7 +16,8 @@ * Class which stores the components of a Kafka Key, and is the format specified in the * {@link com.linkedin.venice.serialization.KafkaKeySerializer}. */ -public class KafkaKey { +public class KafkaKey implements Measurable { + private static final int SHALLOW_CLASS_OVERHEAD = ClassSizeEstimator.getClassOverhead(KafkaKey.class); /** * For control messages, the Key part of the {@link KafkaKey} includes the producer GUID, segment and sequence number. * @@ -32,11 +36,11 @@ public class KafkaKey { private final byte keyHeaderByte; private final byte[] key; // TODO: Consider whether we may want to use a ByteBuffer here - public KafkaKey(@Nonnull MessageType messageType, byte[] key) { + public KafkaKey(@Nonnull MessageType messageType, @Nonnull byte[] key) { this(messageType.getKeyHeaderByte(), key); } - public KafkaKey(byte keyHeaderByte, byte[] key) { + public KafkaKey(byte keyHeaderByte, @Nonnull byte[] key) { this.keyHeaderByte = keyHeaderByte; this.key = key; } @@ -78,9 +82,7 @@ public String toString() { + ByteUtils.toHexString(key) + ")"; } - public int getEstimatedObjectSizeOnHeap() { - // This constant is the estimated size of the enclosing object + the byte[]'s overhead. - // TODO: Find a library that would allow us to precisely measure this and store it in a static constant. - return getKeyLength() + 36; + public int getHeapSize() { + return SHALLOW_CLASS_OVERHEAD + InstanceSizeEstimator.getSize(this.key); } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/ImmutablePubSubMessage.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/ImmutablePubSubMessage.java index b15ec83e4f..caa73c591e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/ImmutablePubSubMessage.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/ImmutablePubSubMessage.java @@ -1,5 +1,7 @@ package com.linkedin.venice.pubsub; +import com.linkedin.venice.memory.ClassSizeEstimator; +import com.linkedin.venice.memory.InstanceSizeEstimator; import com.linkedin.venice.pubsub.api.PubSubMessage; import com.linkedin.venice.pubsub.api.PubSubMessageHeaders; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; @@ -7,6 +9,7 @@ public class ImmutablePubSubMessage implements PubSubMessage { + private static final int SHALLOW_CLASS_OVERHEAD = ClassSizeEstimator.getClassOverhead(ImmutablePubSubMessage.class); private final K key; private final V value; private final PubSubTopicPartition topicPartition; @@ -87,4 +90,11 @@ public PubSubMessageHeaders getPubSubMessageHeaders() { public String toString() { return "PubSubMessage{" + topicPartition + ", offset=" + offset + ", timestamp=" + timestamp + '}'; } + + @Override + public int getHeapSize() { + /** The {@link #topicPartition} is supposed to be a shared instance, and is therefore ignored. */ + return SHALLOW_CLASS_OVERHEAD + InstanceSizeEstimator.getObjectSize(key) + + InstanceSizeEstimator.getObjectSize(value); + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaUtils.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaUtils.java index 9b0ea6b3a6..f024e4e404 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaUtils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaUtils.java @@ -1,5 +1,6 @@ package com.linkedin.venice.pubsub.adapter.kafka; +import com.linkedin.venice.pubsub.api.PubSubMessageHeader; import com.linkedin.venice.pubsub.api.PubSubMessageHeaders; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -7,12 +8,18 @@ public class ApacheKafkaUtils { public static final RecordHeaders EMPTY_RECORD_HEADERS = new RecordHeaders(); + static { + EMPTY_RECORD_HEADERS.setReadOnly(); + } + public static RecordHeaders convertToKafkaSpecificHeaders(PubSubMessageHeaders headers) { - if (headers == null) { + if (headers == null || headers.isEmpty()) { return EMPTY_RECORD_HEADERS; } RecordHeaders recordHeaders = new RecordHeaders(); - headers.toList().forEach(header -> recordHeaders.add(header.key(), header.value())); + for (PubSubMessageHeader header: headers) { + recordHeaders.add(header.key(), header.value()); + } return recordHeaders; } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/EmptyPubSubMessageHeaders.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/EmptyPubSubMessageHeaders.java index cea4522bcf..141f025e5d 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/EmptyPubSubMessageHeaders.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/EmptyPubSubMessageHeaders.java @@ -31,4 +31,14 @@ public PubSubMessageHeaders remove(String key) { public List toList() { return Collections.emptyList(); } + + public boolean isEmpty() { + return true; + } + + @Override + public int getHeapSize() { + // This is the point of using a singleton! + return 0; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessage.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessage.java index c910efb11e..41c3167ed4 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessage.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessage.java @@ -1,6 +1,9 @@ package com.linkedin.venice.pubsub.api; -public interface PubSubMessage { +import com.linkedin.venice.memory.Measurable; + + +public interface PubSubMessage extends Measurable { /** * @return the key part of this message */ diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java index 88c539da53..36970faa78 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java @@ -57,7 +57,7 @@ public PubSubMessage deserialize( KafkaKey key = keySerializer.deserialize(null, keyBytes); KafkaMessageEnvelope value = null; if (key.isControlMessage()) { - for (PubSubMessageHeader header: headers.toList()) { + for (PubSubMessageHeader header: headers) { // only process VENICE_TRANSPORT_PROTOCOL_HEADER here. Other headers will be stored in // ImmutablePubSubMessage and used down the ingestion path later if (header.key().equals(VENICE_TRANSPORT_PROTOCOL_HEADER)) { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeader.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeader.java index d4a604d617..49e7e40b43 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeader.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeader.java @@ -1,6 +1,6 @@ package com.linkedin.venice.pubsub.api; -import com.linkedin.venice.common.Measurable; +import com.linkedin.venice.memory.Measurable; import java.util.Arrays; import java.util.Objects; @@ -47,7 +47,7 @@ public boolean equals(Object otherObj) { * TODO: the following estimation doesn't consider the overhead of the internal structure. */ @Override - public int getSize() { + public int getHeapSize() { return key.length() + value.length; } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java index 828f766948..aa9ce1e042 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java @@ -1,22 +1,26 @@ package com.linkedin.venice.pubsub.api; -import com.linkedin.venice.common.Measurable; +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; + +import com.linkedin.venice.memory.Measurable; +import com.linkedin.venice.utils.collections.MeasurableLinkedHashMap; import java.util.ArrayList; -import java.util.LinkedHashMap; +import java.util.Collections; +import java.util.Iterator; import java.util.List; -import java.util.Map; /** * Set of key-value pairs to tagged with messages produced to a topic. * In case of headers with the same key, only the most recently added headers value will be kept. */ -public class PubSubMessageHeaders implements Measurable { +public class PubSubMessageHeaders implements Measurable, Iterable { + private static final int SHALLOW_CLASS_OVERHEAD = getClassOverhead(PubSubMessageHeaders.class); /** * N.B.: Kafka allows duplicate keys in the headers but some pubsub systems may not * allow it. Hence, we will enforce uniqueness of keys in headers from the beginning. */ - private final Map headers = new LinkedHashMap<>(); + private final MeasurableLinkedHashMap headers = new MeasurableLinkedHashMap<>(); public static final String VENICE_TRANSPORT_PROTOCOL_HEADER = "vtp"; /** Header to denote whether the leader is completed or not */ @@ -43,18 +47,20 @@ public PubSubMessageHeaders remove(String key) { * If no headers are present an empty list is returned. */ public List toList() { - return new ArrayList<>(headers.values()); + return headers.isEmpty() ? Collections.emptyList() : new ArrayList<>(headers.values()); + } + + public boolean isEmpty() { + return headers.isEmpty(); + } + + @Override + public int getHeapSize() { + return SHALLOW_CLASS_OVERHEAD + this.headers.getHeapSize(); } - /** - * TODO: the following estimation doesn't consider the overhead of the internal structure. - */ @Override - public int getSize() { - int size = 0; - for (Map.Entry entry: headers.entrySet()) { - size += entry.getKey().length() + entry.getValue().getSize(); - } - return size; + public Iterator iterator() { + return headers.values().iterator(); } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubProducerAdapterConcurrentDelegator.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubProducerAdapterConcurrentDelegator.java index aa59261448..c0b019d4ed 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubProducerAdapterConcurrentDelegator.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubProducerAdapterConcurrentDelegator.java @@ -1,11 +1,13 @@ package com.linkedin.venice.pubsub.api; -import com.linkedin.venice.common.Measurable; +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; + import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.memory.InstanceSizeEstimator; +import com.linkedin.venice.memory.Measurable; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.utils.DaemonThreadFactory; -import com.linkedin.venice.utils.ProtocolUtils; import com.linkedin.venice.utils.collections.MemoryBoundBlockingQueue; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import it.unimi.dsi.fastutil.objects.Object2DoubleMap; @@ -120,8 +122,9 @@ public PubSubProducerAdapterConcurrentDelegator( } public static class ProducerQueueNode implements Measurable { - // Rough estimation. - private static final int PRODUCER_QUEUE_NODE_OVERHEAD = 256; + /** We assume that the {@link #produceFuture} is empty, and the {@link #topic} is a shared instance. */ + private static final int PRODUCER_QUEUE_NODE_PARTIAL_OVERHEAD = + getClassOverhead(ProducerQueueNode.class) + getClassOverhead(CompletableFuture.class); private final String topic; private final int partition; @@ -149,10 +152,13 @@ public ProducerQueueNode( } @Override - public int getSize() { - return topic.length() + 4 + key.getEstimatedObjectSizeOnHeap() - + ProtocolUtils.getEstimateOfMessageEnvelopeSizeOnHeap(value) + headers.getSize() - + PRODUCER_QUEUE_NODE_OVERHEAD; + public int getHeapSize() { + int size = PRODUCER_QUEUE_NODE_PARTIAL_OVERHEAD + key.getHeapSize() + InstanceSizeEstimator.getSize(value) + + headers.getHeapSize(); + if (this.callback instanceof Measurable) { + size += ((Measurable) this.callback).getHeapSize(); + } + return size; } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/ProtocolUtils.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/ProtocolUtils.java deleted file mode 100644 index 3b8ccf9271..0000000000 --- a/internal/venice-common/src/main/java/com/linkedin/venice/utils/ProtocolUtils.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.linkedin.venice.utils; - -import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; -import com.linkedin.venice.kafka.protocol.Put; -import com.linkedin.venice.kafka.protocol.Update; -import com.linkedin.venice.kafka.protocol.enums.MessageType; - - -public class ProtocolUtils { - /** - * Measure the heap usage of {@link KafkaMessageEnvelope}. - */ - public static int getEstimateOfMessageEnvelopeSizeOnHeap(KafkaMessageEnvelope messageEnvelope) { - int kmeBaseOverhead = 100; // Super rough estimate. TODO: Measure with a more precise library and store statically - switch (MessageType.valueOf(messageEnvelope)) { - case PUT: - Put put = (Put) messageEnvelope.payloadUnion; - int size = put.putValue.capacity(); - if (put.replicationMetadataPayload != null - /** - * N.B.: When using the {@link org.apache.avro.io.OptimizedBinaryDecoder}, the {@link put.putValue} and the - * {@link put.replicationMetadataPayload} will be backed by the same underlying array. If that is the - * case, then we don't want to account for the capacity twice. - */ - && put.replicationMetadataPayload.array() != put.putValue.array()) { - size += put.replicationMetadataPayload.capacity(); - } - return size + kmeBaseOverhead; - case UPDATE: - Update update = (Update) messageEnvelope.payloadUnion; - return update.updateValue.capacity() + kmeBaseOverhead; - default: - return kmeBaseOverhead; - } - } -} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/collections/MeasurableLinkedHashMap.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/collections/MeasurableLinkedHashMap.java new file mode 100644 index 0000000000..cffeb90fc8 --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/utils/collections/MeasurableLinkedHashMap.java @@ -0,0 +1,107 @@ +package com.linkedin.venice.utils.collections; + +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; +import static com.linkedin.venice.memory.InstanceSizeEstimator.getByteArraySizeByLength; + +import com.linkedin.venice.memory.Measurable; +import java.util.LinkedHashMap; + + +/** + * A subclass of {@link LinkedHashMap} which does a best-effort attempt at guessing its size on heap + the size of its + * values. Several assumptions are made which could make this imprecise in some contexts. See the usage in + * {@link com.linkedin.venice.pubsub.api.PubSubMessageHeaders} for the usage it was originally intended for... + */ +public class MeasurableLinkedHashMap extends LinkedHashMap implements Measurable { + private static final int SHALLOW_CLASS_OVERHEAD = getClassOverhead(MeasurableLinkedHashMap.class); + private static final int ENTRY_CLASS_OVERHEAD = getClassOverhead(PseudoLinkedHashMapEntry.class); + /** + * The reason to have {@link PseudoLinkedValues} is that if we call {@link #getHeapSize()} then we will iterate over + * the values and this causes a cached view of the map to be held, thus creating one more object... + */ + private static final int LINKED_VALUES_SHALLOW_CLASS_OVERHEAD = + getClassOverhead(new MeasurableLinkedHashMap().getPseudoLinkedValues().getClass()); + + /** + * The default initial capacity - MUST be a power of two. + */ + private static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16 + + /** + * The maximum capacity, used if a higher value is implicitly specified + * by either of the constructors with arguments. + * MUST be a power of two <= 1<<30. + */ + private static final int MAXIMUM_CAPACITY = 1 << 30; + + /** + * The load factor used when none specified in constructor. + */ + private static final float DEFAULT_LOAD_FACTOR = 0.75f; + + private int guessCurrentlyAllocatedTableSize() { + if (isEmpty()) { + /** + * This is not necessarily true... if a map had entries in it, and then they got all removed, then the table would + * be empty but its capacity would still be whatever it was when it had the most entries, since maps grow but + * don't shrink. Tracking this accurately would require making this subclass stateful, which would add complexity + * and may not be a good trade. So we take a naive approach instead. + */ + return 0; + } + + int assumedCapacity = DEFAULT_INITIAL_CAPACITY; + int assumedThreshold = (int) (DEFAULT_LOAD_FACTOR * (float) assumedCapacity); + while (assumedCapacity <= MAXIMUM_CAPACITY) { + if (size() < assumedThreshold) { + return assumedCapacity; + } + + // double assumed capacity and threshold + assumedCapacity = assumedCapacity << 1; + assumedThreshold = assumedThreshold << 1; + } + return MAXIMUM_CAPACITY; + } + + @Override + public int getHeapSize() { + int size = SHALLOW_CLASS_OVERHEAD; + int tableSize = getByteArraySizeByLength(guessCurrentlyAllocatedTableSize()); + if (tableSize > 0) { + size += tableSize; + size += size() * ENTRY_CLASS_OVERHEAD; + for (V value: values()) { + /** + * N.B.: We only consider the value size, and ignore the key size. In our initial use case for this class, this + * is okay, though technically not correct. + */ + size += value.getHeapSize(); + } + /** + * Unfortunately, the act of calling {@link #values()} ends up generating a new object, and storing it in a class + * field, so we must now account for it... + */ + size += LINKED_VALUES_SHALLOW_CLASS_OVERHEAD; + } + + return size; + } + + // The classes below are for the sake of measuring overhead, since the equivalent JDK classes have limited visibility + static class PseudoHashMapNode { + int hash; + Object key, value, next; + } + + static class PseudoLinkedHashMapEntry extends PseudoHashMapNode { + Object before, after; + } + + private PseudoLinkedValues getPseudoLinkedValues() { + return new PseudoLinkedValues(); + } + + final class PseudoLinkedValues { + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/collections/MemoryBoundBlockingQueue.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/collections/MemoryBoundBlockingQueue.java index 6e51a4d17d..8206d00016 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/utils/collections/MemoryBoundBlockingQueue.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/utils/collections/MemoryBoundBlockingQueue.java @@ -1,7 +1,9 @@ package com.linkedin.venice.utils.collections; -import com.linkedin.venice.common.Measurable; +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; + import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.memory.Measurable; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; @@ -46,11 +48,9 @@ public class MemoryBoundBlockingQueue implements BlockingQueue { private static final Logger LOGGER = LogManager.getLogger(MemoryBoundBlockingQueue.class); /** - * Considering the node implementation: {@link java.util.LinkedList.Node}, the overhead - * is three references, which could be about 24 bytes, and the 'Node' object type itself could take 24 bytes. - * We can adjust this value later if necessary. + * Considering the node implementation: {@link java.util.LinkedList.Node}, the overhead is three references. */ - public static final int LINKED_QUEUE_NODE_OVERHEAD_IN_BYTE = 48; + public static final int LINKED_LIST_NODE_SHALLOW_OVERHEAD = getClassOverhead(LinkedListQueueNodeSimulation.class); private final Queue queue; private final long memoryCapacityInByte; @@ -87,7 +87,7 @@ public long remainingMemoryCapacityInByte() { } private long getRecordSize(T record) { - return record.getSize() + LINKED_QUEUE_NODE_OVERHEAD_IN_BYTE; + return record.getHeapSize() + LINKED_LIST_NODE_SHALLOW_OVERHEAD; } @Override @@ -252,4 +252,11 @@ public int drainTo(Collection c) { public int drainTo(Collection c, int maxElements) { throw new VeniceException("Operation is not supported yet!"); } + + /** + * Only used to measure the size on heap of a class with three references. + */ + private static final class LinkedListQueueNodeSimulation { + private Object field1, field2, field3; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompletableFutureCallback.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompletableFutureCallback.java index 501ef523e9..6da8e1459f 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompletableFutureCallback.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompletableFutureCallback.java @@ -1,5 +1,8 @@ package com.linkedin.venice.writer; +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; + +import com.linkedin.venice.memory.Measurable; import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pubsub.api.PubSubProducerCallback; import java.util.concurrent.CompletableFuture; @@ -11,7 +14,10 @@ * changed and the callback will be called. The caller can pass a {@code CompletableFutureCallback} to a function * accepting a {@code Callback} parameter to get a {@code CompletableFuture} after the function returns. */ -public class CompletableFutureCallback implements PubSubProducerCallback { +public class CompletableFutureCallback implements PubSubProducerCallback, Measurable { + private static final int SHALLOW_CLASS_OVERHEAD = getClassOverhead(CompletableFutureCallback.class); + private static final int COMPLETABLE_FUTURE_SHALLOW_CLASS_OVERHEAD = getClassOverhead(CompletableFuture.class); + private final CompletableFuture completableFuture; private PubSubProducerCallback callback = null; @@ -36,4 +42,16 @@ public PubSubProducerCallback getCallback() { public void setCallback(PubSubProducerCallback callback) { this.callback = callback; } + + @Override + public int getHeapSize() { + int size = SHALLOW_CLASS_OVERHEAD; + if (this.completableFuture != null) { + size += COMPLETABLE_FUTURE_SHALLOW_CLASS_OVERHEAD; + } + if (this.callback instanceof Measurable) { + size += ((Measurable) this.callback).getHeapSize(); + } + return size; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/SendMessageErrorLoggerCallback.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/SendMessageErrorLoggerCallback.java index 23adbe403e..ac6ab2bc1e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/SendMessageErrorLoggerCallback.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/SendMessageErrorLoggerCallback.java @@ -1,12 +1,16 @@ package com.linkedin.venice.writer; +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; + import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.memory.Measurable; import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pubsub.api.PubSubProducerCallback; import org.apache.logging.log4j.Logger; -class SendMessageErrorLoggerCallback implements PubSubProducerCallback { +class SendMessageErrorLoggerCallback implements PubSubProducerCallback, Measurable { + private static final int SHALLOW_CLASS_OVERHEAD = getClassOverhead(SendMessageErrorLoggerCallback.class); private final KafkaMessageEnvelope value; private final Logger logger; @@ -25,4 +29,13 @@ public void onCompletion(PubSubProduceResult produceResult, Exception e) { e); } } + + /** + * N.B.: For this use case, the shallow overhead is expected to be the relevant size, since both of the fields should + * point to shared instances. + */ + @Override + public int getHeapSize() { + return SHALLOW_CLASS_OVERHEAD; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index b88cdd8eee..5d8cd79cb2 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -261,7 +261,18 @@ public class VeniceWriter extends AbstractVeniceWriter { private final Map defaultDebugInfo; private final boolean elapsedTimeForClosingSegmentEnabled; private final Object[] partitionLocks; - private String writerId; + private final String writerId; + + public static class DefaultLeaderMetadata extends LeaderMetadata { + public DefaultLeaderMetadata(CharSequence hostName) { + this.hostName = hostName; + this.upstreamOffset = DEFAULT_LEADER_METADATA_WRAPPER.getUpstreamOffset(); + this.upstreamKafkaClusterId = DEFAULT_LEADER_METADATA_WRAPPER.getUpstreamKafkaClusterId(); + } + } + + /** Used to reduce memory allocation in cases where the metadata is always going to be the same. */ + private final DefaultLeaderMetadata defaultLeaderMetadata; private volatile boolean isClosed = false; private final Object closeLock = new Object(); @@ -342,13 +353,12 @@ public VeniceWriter( // if INSTANCE_ID is not set, we'd use "hostname:port" as the default writer id if (props.containsKey(INSTANCE_ID)) { this.writerId = props.getString(INSTANCE_ID); + } else if (props.containsKey(LISTENER_PORT)) { + this.writerId = Utils.getHostName() + ":" + props.getInt(LISTENER_PORT); } else { this.writerId = Utils.getHostName(); - - if (props.containsKey(LISTENER_PORT)) { - this.writerId += ":" + props.getInt(LISTENER_PORT); - } } + this.defaultLeaderMetadata = new DefaultLeaderMetadata(this.writerId); this.producerGUID = GuidUtils.getGUID(props); this.logger = LogManager.getLogger("VeniceWriter [" + GuidUtils.getHexFromGuid(producerGUID) + "]"); // Create a thread pool which can have max 2 threads. @@ -1435,7 +1445,7 @@ private PubSubMessageHeaders getHeaders( if (addLeaderCompleteState) { // copy protocolSchemaHeaders locally and add extra header for leaderCompleteState returnPubSubMessageHeaders = new PubSubMessageHeaders(); - for (PubSubMessageHeader header: pubSubMessageHeaders.toList()) { + for (PubSubMessageHeader header: pubSubMessageHeaders) { returnPubSubMessageHeaders.add(header); } returnPubSubMessageHeaders.add(getLeaderCompleteStateHeader(leaderCompleteState)); @@ -1948,10 +1958,14 @@ protected KafkaMessageEnvelope getKafkaMessageEnvelope( producerMetadata.messageTimestamp = time.getMilliseconds(); producerMetadata.logicalTimestamp = logicalTs; kafkaValue.producerMetadata = producerMetadata; - kafkaValue.leaderMetadataFooter = new LeaderMetadata(); - kafkaValue.leaderMetadataFooter.hostName = writerId; - kafkaValue.leaderMetadataFooter.upstreamOffset = leaderMetadataWrapper.getUpstreamOffset(); - kafkaValue.leaderMetadataFooter.upstreamKafkaClusterId = leaderMetadataWrapper.getUpstreamKafkaClusterId(); + if (leaderMetadataWrapper == DEFAULT_LEADER_METADATA_WRAPPER) { + kafkaValue.leaderMetadataFooter = this.defaultLeaderMetadata; + } else { + kafkaValue.leaderMetadataFooter = new LeaderMetadata(); + kafkaValue.leaderMetadataFooter.hostName = writerId; + kafkaValue.leaderMetadataFooter.upstreamOffset = leaderMetadataWrapper.getUpstreamOffset(); + kafkaValue.leaderMetadataFooter.upstreamKafkaClusterId = leaderMetadataWrapper.getUpstreamKafkaClusterId(); + } return kafkaValue; } diff --git a/internal/venice-common/src/test/java/com/linkedin/davinci/kafka/consumer/SBSQueueNodeFactory.java b/internal/venice-common/src/test/java/com/linkedin/davinci/kafka/consumer/SBSQueueNodeFactory.java new file mode 100644 index 0000000000..66df20ab76 --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/davinci/kafka/consumer/SBSQueueNodeFactory.java @@ -0,0 +1,46 @@ +package com.linkedin.davinci.kafka.consumer; + +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.pubsub.api.PubSubMessage; + + +/** + * Simple utility class to get access to the package-private classes in {@link StoreBufferService} and their + * constructors. + */ +public class SBSQueueNodeFactory { + public static StoreBufferService.QueueNode queueNode( + PubSubMessage consumerRecord, + StoreIngestionTask ingestionTask, + String kafkaUrl, + long beforeProcessingRecordTimestampNs) { + return new StoreBufferService.QueueNode(consumerRecord, ingestionTask, kafkaUrl, beforeProcessingRecordTimestampNs); + } + + public static StoreBufferService.LeaderQueueNode leaderQueueNode( + PubSubMessage consumerRecord, + StoreIngestionTask ingestionTask, + String kafkaUrl, + long beforeProcessingRecordTimestampNs, + LeaderProducedRecordContext leaderProducedRecordContext) { + return new StoreBufferService.LeaderQueueNode( + consumerRecord, + ingestionTask, + kafkaUrl, + beforeProcessingRecordTimestampNs, + leaderProducedRecordContext); + } + + public static Class queueNodeClass() { + return StoreBufferService.QueueNode.class; + } + + public static Class leaderQueueNodeClass() { + return StoreBufferService.LeaderQueueNode.class; + } + + private SBSQueueNodeFactory() { + // Utility class + } +} diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/memory/ClassSizeEstimatorTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/memory/ClassSizeEstimatorTest.java new file mode 100644 index 0000000000..7493887198 --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/memory/ClassSizeEstimatorTest.java @@ -0,0 +1,396 @@ +package com.linkedin.venice.memory; + +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; +import static org.testng.Assert.assertThrows; + +import org.testng.annotations.Test; + + +public class ClassSizeEstimatorTest extends HeapSizeEstimatorTest { + public ClassSizeEstimatorTest() { + super(SubSubClassWithThreePrimitiveBooleanFields.class); + } + + @Test + public void testParamValidation() { + assertThrows(NullPointerException.class, () -> getClassOverhead(null)); + } + + @Test(dataProvider = "testMethodologies") + public void testClassOverhead(TestMethodology tm) { + printHeader(tm.resultsTableHeader); + + TestFunction tf = tm.tfProvider.apply(this); + + // Most basic case... just a plain Object. + tf.test(Object.class, 0); + + // Ensure that inheritance (in and of itself) adds no overhead. + tf.test(SubclassOfObjectWithNoFields.class, 0); + + // Ensure that one public primitive fields within a single class is accounted. + tf.test(ClassWithOnePublicPrimitiveBooleanField.class, BOOLEAN_SIZE); + tf.test(ClassWithOnePublicPrimitiveByteField.class, Byte.BYTES); + tf.test(ClassWithOnePublicPrimitiveCharField.class, Character.BYTES); + tf.test(ClassWithOnePublicPrimitiveShortField.class, Short.BYTES); + tf.test(ClassWithOnePublicPrimitiveIntField.class, Integer.BYTES); + tf.test(ClassWithOnePublicPrimitiveFloatField.class, Float.BYTES); + tf.test(ClassWithOnePublicPrimitiveLongField.class, Long.BYTES); + tf.test(ClassWithOnePublicPrimitiveDoubleField.class, Double.BYTES); + + // Ensure that two private primitive fields within a single class are accounted. + tf.test(ClassWithTwoPrimitiveBooleanFields.class, BOOLEAN_SIZE * 2); + tf.test(ClassWithTwoPrimitiveByteFields.class, Byte.BYTES * 2); + tf.test(ClassWithTwoPrimitiveCharFields.class, Character.BYTES * 2); + tf.test(ClassWithTwoPrimitiveShortFields.class, Short.BYTES * 2); + tf.test(ClassWithTwoPrimitiveIntFields.class, Integer.BYTES * 2); + tf.test(ClassWithTwoPrimitiveFloatFields.class, Float.BYTES * 2); + tf.test(ClassWithTwoPrimitiveLongFields.class, Long.BYTES * 2); + tf.test(ClassWithTwoPrimitiveDoubleFields.class, Double.BYTES * 2); + + // Ensure that a mix of public and private fields across the class hierarchy are accounted. + if (JAVA_MAJOR_VERSION < 15) { + // TODO: Plug in correct expected field size for these JVMs... + tf.test(SubClassWithTwoPrimitiveBooleanFields.class); + tf.test(SubClassWithTwoPrimitiveByteFields.class); + tf.test(SubClassWithTwoPrimitiveCharFields.class); + tf.test(SubClassWithTwoPrimitiveShortFields.class); + + tf.test(SubSubClassWithThreePrimitiveBooleanFields.class); + tf.test(SubSubClassWithThreePrimitiveByteFields.class); + tf.test(SubSubClassWithThreePrimitiveCharFields.class); + tf.test(SubSubClassWithThreePrimitiveShortFields.class); + } else { + tf.test(SubClassWithTwoPrimitiveBooleanFields.class, BOOLEAN_SIZE * 2); + tf.test(SubClassWithTwoPrimitiveByteFields.class, Byte.BYTES * 2); + tf.test(SubClassWithTwoPrimitiveCharFields.class, Character.BYTES * 2); + tf.test(SubClassWithTwoPrimitiveShortFields.class, Short.BYTES * 2); + + tf.test(SubSubClassWithThreePrimitiveBooleanFields.class, BOOLEAN_SIZE * 3); + tf.test(SubSubClassWithThreePrimitiveByteFields.class, Byte.BYTES * 3); + tf.test(SubSubClassWithThreePrimitiveCharFields.class, Character.BYTES * 3); + tf.test(SubSubClassWithThreePrimitiveShortFields.class, Short.BYTES * 3); + } + + tf.test(SubClassWithTwoPrimitiveIntFields.class, Integer.BYTES * 2); + tf.test(SubClassWithTwoPrimitiveFloatFields.class, Float.BYTES * 2); + tf.test(SubClassWithTwoPrimitiveLongFields.class, Long.BYTES * 2); + tf.test(SubClassWithTwoPrimitiveDoubleFields.class, Double.BYTES * 2); + + tf.test(SubSubClassWithThreePrimitiveIntFields.class, Integer.BYTES * 3); + tf.test(SubSubClassWithThreePrimitiveFloatFields.class, Float.BYTES * 3); + tf.test(SubSubClassWithThreePrimitiveLongFields.class, Long.BYTES * 3); + tf.test(SubSubClassWithThreePrimitiveDoubleFields.class, Double.BYTES * 3); + + // Ensure that pointers are properly accounted. + tf.test(ClassWithThreeObjectPointers.class, POINTER_SIZE * 3); + + // Ensure that arrays are properly accounted. + tf.test(ClassWithArray.class, POINTER_SIZE); + tf.test(Object[].class, () -> new Object[0]); + tf.test(boolean[].class, () -> new boolean[0]); + tf.test(byte[].class, () -> new byte[0]); + tf.test(char[].class, () -> new char[0]); + tf.test(short[].class, () -> new short[0]); + tf.test(int[].class, () -> new int[0]); + tf.test(float[].class, () -> new float[0]); + tf.test(long[].class, () -> new long[0]); + tf.test(double[].class, () -> new double[0]); + + /** + * Ensure that field packing and ordering is accounted. + * + * Note that we don't actually do anything special about packing and ordering, and things still work. The ordering + * can have implications in terms of padding fields to protect against false sharing, but does not appear to have + * a concrete effect on memory utilization (or at least, none of the current test cases expose such issue). + */ + tf.test(FieldPacking.class); + tf.test(FieldOrder.class); + + // Put it all together... + tf.test(ComplexClass.class, POINTER_SIZE * 10); + + printResultSeparatorLine(); + } + + private static class SubclassOfObjectWithNoFields { + public SubclassOfObjectWithNoFields() { + } + } + + private static class ClassWithOnePublicPrimitiveBooleanField { + public boolean publicField; + + public ClassWithOnePublicPrimitiveBooleanField() { + } + } + + private static class ClassWithOnePublicPrimitiveByteField { + public byte publicField; + + public ClassWithOnePublicPrimitiveByteField() { + } + } + + private static class ClassWithOnePublicPrimitiveCharField { + public char publicField; + + public ClassWithOnePublicPrimitiveCharField() { + } + } + + private static class ClassWithOnePublicPrimitiveShortField { + public short publicField; + + public ClassWithOnePublicPrimitiveShortField() { + } + } + + private static class ClassWithOnePublicPrimitiveIntField { + public int publicField; + + public ClassWithOnePublicPrimitiveIntField() { + } + } + + private static class ClassWithOnePublicPrimitiveFloatField { + public float publicField; + + public ClassWithOnePublicPrimitiveFloatField() { + } + } + + private static class ClassWithOnePublicPrimitiveLongField { + public long publicField; + + public ClassWithOnePublicPrimitiveLongField() { + } + } + + private static class ClassWithOnePublicPrimitiveDoubleField { + public double publicField; + + public ClassWithOnePublicPrimitiveDoubleField() { + } + } + + private static class ClassWithTwoPrimitiveBooleanFields { + private boolean field1, field2; + + public ClassWithTwoPrimitiveBooleanFields() { + } + } + + private static class ClassWithTwoPrimitiveByteFields { + private byte field1, field2; + + public ClassWithTwoPrimitiveByteFields() { + } + } + + private static class ClassWithTwoPrimitiveCharFields { + private char field1, field2; + + public ClassWithTwoPrimitiveCharFields() { + } + } + + private static class ClassWithTwoPrimitiveShortFields { + private short field1, field2; + + public ClassWithTwoPrimitiveShortFields() { + } + } + + private static class ClassWithTwoPrimitiveIntFields { + private int field1, field2; + + public ClassWithTwoPrimitiveIntFields() { + } + } + + private static class ClassWithTwoPrimitiveFloatFields { + private float field1, field2; + + public ClassWithTwoPrimitiveFloatFields() { + } + } + + private static class ClassWithTwoPrimitiveLongFields { + private long field1, field2; + + public ClassWithTwoPrimitiveLongFields() { + } + } + + private static class ClassWithTwoPrimitiveDoubleFields { + private double field1, field2; + + public ClassWithTwoPrimitiveDoubleFields() { + } + } + + private static class SubClassWithTwoPrimitiveBooleanFields extends ClassWithOnePublicPrimitiveBooleanField { + private boolean privateField; + + public SubClassWithTwoPrimitiveBooleanFields() { + } + } + + private static class SubClassWithTwoPrimitiveByteFields extends ClassWithOnePublicPrimitiveByteField { + private byte privateField; + + public SubClassWithTwoPrimitiveByteFields() { + } + } + + private static class SubClassWithTwoPrimitiveCharFields extends ClassWithOnePublicPrimitiveCharField { + private char privateField; + + public SubClassWithTwoPrimitiveCharFields() { + } + } + + private static class SubClassWithTwoPrimitiveShortFields extends ClassWithOnePublicPrimitiveShortField { + private short privateField; + + public SubClassWithTwoPrimitiveShortFields() { + } + } + + private static class SubClassWithTwoPrimitiveIntFields extends ClassWithOnePublicPrimitiveIntField { + private int privateField; + + public SubClassWithTwoPrimitiveIntFields() { + } + } + + private static class SubClassWithTwoPrimitiveFloatFields extends ClassWithOnePublicPrimitiveFloatField { + private float privateField; + + public SubClassWithTwoPrimitiveFloatFields() { + } + } + + private static class SubClassWithTwoPrimitiveLongFields extends ClassWithOnePublicPrimitiveLongField { + private long privateField; + + public SubClassWithTwoPrimitiveLongFields() { + } + } + + private static class SubClassWithTwoPrimitiveDoubleFields extends ClassWithOnePublicPrimitiveDoubleField { + private double privateField; + + public SubClassWithTwoPrimitiveDoubleFields() { + } + } + + private static class SubSubClassWithThreePrimitiveBooleanFields extends SubClassWithTwoPrimitiveBooleanFields { + private boolean privateField; + + public SubSubClassWithThreePrimitiveBooleanFields() { + } + } + + private static class SubSubClassWithThreePrimitiveByteFields extends SubClassWithTwoPrimitiveByteFields { + private byte privateField; + + public SubSubClassWithThreePrimitiveByteFields() { + } + } + + private static class SubSubClassWithThreePrimitiveCharFields extends SubClassWithTwoPrimitiveCharFields { + private char privateField; + + public SubSubClassWithThreePrimitiveCharFields() { + } + } + + private static class SubSubClassWithThreePrimitiveShortFields extends SubClassWithTwoPrimitiveShortFields { + private short privateField; + + public SubSubClassWithThreePrimitiveShortFields() { + } + } + + private static class SubSubClassWithThreePrimitiveIntFields extends SubClassWithTwoPrimitiveIntFields { + private int privateField; + + public SubSubClassWithThreePrimitiveIntFields() { + } + } + + private static class SubSubClassWithThreePrimitiveFloatFields extends SubClassWithTwoPrimitiveFloatFields { + private float privateField; + + public SubSubClassWithThreePrimitiveFloatFields() { + } + } + + private static class SubSubClassWithThreePrimitiveLongFields extends SubClassWithTwoPrimitiveLongFields { + private long privateField; + + public SubSubClassWithThreePrimitiveLongFields() { + } + } + + private static class SubSubClassWithThreePrimitiveDoubleFields extends SubClassWithTwoPrimitiveDoubleFields { + private double privateField; + + public SubSubClassWithThreePrimitiveDoubleFields() { + } + } + + private static class ClassWithThreeObjectPointers { + Object field1, field2, field3; + + public ClassWithThreeObjectPointers() { + } + } + + private static class ClassWithArray { + public Object[] array; + + public ClassWithArray() { + } + } + + /** See: https://shipilev.net/jvm/objects-inside-out/#_field_packing */ + private static class FieldPacking { + boolean b; + long l; + char c; + int i; + + public FieldPacking() { + } + } + + /** See: https://shipilev.net/jvm/objects-inside-out/#_observation_field_declaration_order_field_layout_order */ + private static class FieldOrder { + boolean firstField; + long secondField; + char thirdField; + int fourthField; + + public FieldOrder() { + } + } + + private static class ComplexClass { + ClassWithTwoPrimitiveBooleanFields field1; + ClassWithTwoPrimitiveByteFields field2; + ClassWithTwoPrimitiveCharFields field3; + ClassWithTwoPrimitiveShortFields field4; + ClassWithTwoPrimitiveIntFields field5; + ClassWithTwoPrimitiveFloatFields field6; + ClassWithTwoPrimitiveLongFields field7; + ClassWithTwoPrimitiveDoubleFields field8; + ClassWithThreeObjectPointers field9; + ClassWithArray field10; + + public ComplexClass() { + } + } +} diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/memory/HeapSizeEstimatorTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/memory/HeapSizeEstimatorTest.java new file mode 100644 index 0000000000..7099bf2848 --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/memory/HeapSizeEstimatorTest.java @@ -0,0 +1,292 @@ +package com.linkedin.venice.memory; + +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.fail; + +import com.linkedin.venice.utils.Utils; +import java.lang.reflect.Constructor; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; + + +public abstract class HeapSizeEstimatorTest { + private static final Logger LOGGER = LogManager.getLogger(HeapSizeEstimatorTest.class); + /** + * Some scenarios are tricky to compute dynamically without just copy-pasting the whole main code, so we just skip it + * for now, though we could come back to it later... + */ + private static final int SKIP_EXPECTED_FIELD_OVERHEAD = -1; + private static final Runtime RUNTIME = Runtime.getRuntime(); + private static final int NUMBER_OF_ALLOCATIONS_WHEN_MEASURING = 200_000; + protected static final int JAVA_MAJOR_VERSION = Utils.getJavaMajorVersion(); + protected static final int BOOLEAN_SIZE = 1; + private static final int ALIGNMENT_SIZE; + protected static final int OBJECT_HEADER_SIZE; + protected static final int ARRAY_HEADER_SIZE; + protected static final int POINTER_SIZE; + + static { + // This duplicates the main code, which is not ideal, but there isn't much choice if we want the test to run in + // various JVM scenarios... + boolean is64bitsJVM = ClassSizeEstimator.is64bitsJVM(); + int markWordSize = is64bitsJVM ? 8 : 4; + boolean isCompressedOopsEnabled = ClassSizeEstimator.isUseCompressedOopsEnabled(); + boolean isCompressedKlassPointersEnabled = ClassSizeEstimator.isCompressedKlassPointersEnabled(); + int classPointerSize = isCompressedKlassPointersEnabled ? 4 : 8; + + ALIGNMENT_SIZE = is64bitsJVM ? 8 : 4; + OBJECT_HEADER_SIZE = markWordSize + classPointerSize; + ARRAY_HEADER_SIZE = roundUpToNearestAlignment(OBJECT_HEADER_SIZE + Integer.BYTES); + POINTER_SIZE = isCompressedOopsEnabled ? 4 : 8; + } + + private final int[] resultRowCellLengths; + + protected HeapSizeEstimatorTest(Class classWithLongestName) { + this.resultRowCellLengths = new int[] { 6, classWithLongestName.getSimpleName().length(), 9, 9 }; + } + + protected interface TestFunction { + default void test(Class c) { + test(c, SKIP_EXPECTED_FIELD_OVERHEAD, null); + } + + default void test(Class c, Supplier constructor) { + test(c, SKIP_EXPECTED_FIELD_OVERHEAD, constructor); + } + + default void test(Class c, int expectedFieldOverhead) { + test(c, expectedFieldOverhead, null); + } + + void test(Class c, int expectedFieldOverhead, Supplier constructor); + } + + protected enum TestMethodology { + THEORETICAL_EXPECTATION( + o -> o::theoreticalExpectation, new String[] { "status", "class name", "predicted", "expected" } + ), + EMPIRICAL_MEASUREMENT( + o -> o::empiricalClassMeasurement, new String[] { "status", "class name", "predicted", "allocated" } + ); + + final Function tfProvider; + final String[] resultsTableHeader; + + TestMethodology(Function tfProvider, String[] resultsTableHeader) { + this.tfProvider = tfProvider; + this.resultsTableHeader = resultsTableHeader; + } + } + + @DataProvider + public Object[][] testMethodologies() { + return new Object[][] { { TestMethodology.THEORETICAL_EXPECTATION }, { TestMethodology.EMPIRICAL_MEASUREMENT } }; + } + + @BeforeTest + public void preTest() { + System.gc(); + } + + protected void theoreticalExpectation(Class c, int expectedFieldOverhead, Supplier constructor) { + int predictedClassOverhead = getClassOverhead(c); + int expectedClassOverheadWithoutAlignment = OBJECT_HEADER_SIZE + expectedFieldOverhead; + if (expectedFieldOverhead != SKIP_EXPECTED_FIELD_OVERHEAD) { + int expectedClassOverhead = roundUpToNearestAlignment(expectedClassOverheadWithoutAlignment); + boolean success = predictedClassOverhead == expectedClassOverhead; + printResultRow( + status(success, 1, 1), + c.getSimpleName(), + String.valueOf(predictedClassOverhead), + String.valueOf(expectedClassOverhead)); + assertEquals(predictedClassOverhead, expectedClassOverhead); + } + } + + private void empiricalClassMeasurement(Class c, int expectedFieldOverhead, Supplier constructor) { + empiricalMeasurement(c, getClassOverhead(c), constructor); + } + + protected void empiricalMeasurement(Class c, int predictedUsage, Supplier constructor) { + /** + * The reason for having multiple attempts is that the allocation measurement method is not always reliable. + * Presumably, this is because GC could kick in during the middle of the allocation loop. If the allocated memory + * is negative then for sure it's not right. If the GC reduces memory allocated but not enough to make the + * measurement go negative, then we cannot know if it's a measurement error, or a bug... In any case, we will do + * a few attempts and assume that the measurement is good if it falls within the prescribed delta (even though + * technically that could be a false negative). + */ + int currentAttempt = 0; + int totalAttempts = 3; + while (currentAttempt++ < totalAttempts) { + assertNotEquals(RUNTIME.maxMemory(), Long.MAX_VALUE); + Object[] allocations = new Object[NUMBER_OF_ALLOCATIONS_WHEN_MEASURING]; + + if (constructor == null) { + Class[] argTypes = new Class[0]; + Object[] args = new Object[0]; + Constructor reflectiveConstructor; + try { + reflectiveConstructor = c.getConstructor(argTypes); + } catch (NoSuchMethodException e) { + fail("Could not get a no-arg constructor for " + c.getSimpleName(), e); + throw new RuntimeException(e); + } + + constructor = () -> { + try { + return reflectiveConstructor.newInstance(args); + } catch (Exception e) { + fail("Could not invoke the no-arg constructor for " + c.getSimpleName(), e); + } + return null; // Unreachable code, just to appease the compiler + }; + } + + long memoryAllocatedBeforeInstantiations = getCurrentlyAllocatedMemory(); + + for (int i = 0; i < NUMBER_OF_ALLOCATIONS_WHEN_MEASURING; i++) { + allocations[i] = constructor.get(); + } + + long memoryAllocatedAfterInstantiations = getCurrentlyAllocatedMemory(); + long memoryAllocatedByInstantiations = memoryAllocatedAfterInstantiations - memoryAllocatedBeforeInstantiations; + if (memoryAllocatedByInstantiations < 0) { + String errorMessage = "Memory allocated is negative! memoryAllocatedBeforeInstantiations: " + + memoryAllocatedBeforeInstantiations + "; memoryAllocatedAfterInstantiations: " + + memoryAllocatedAfterInstantiations + "; memoryAllocatedByInstantiations: " + + memoryAllocatedByInstantiations + "; " + currentAttempt + " attempts left."; + if (currentAttempt < totalAttempts) { + LOGGER.info(errorMessage); + continue; + } else { + fail(errorMessage); + } + } + + double memoryAllocatedPerInstance = + (double) memoryAllocatedByInstantiations / (double) NUMBER_OF_ALLOCATIONS_WHEN_MEASURING; + + for (int i = 0; i < NUMBER_OF_ALLOCATIONS_WHEN_MEASURING; i++) { + assertNotNull(allocations[i]); + } + + // Since the above method for measuring allocated memory is imperfect, we need to tolerate some delta. + double allocatedToPredictedRatio = memoryAllocatedPerInstance / (double) predictedUsage; + double delta = Math.abs(1 - allocatedToPredictedRatio); + + // For small objects, any delta of 1 byte or less will be tolerated + double minimumAbsoluteDeltaInBytes = 1; + double minimumAbsoluteDelta = minimumAbsoluteDeltaInBytes / memoryAllocatedPerInstance; + + // For larger objects, we'll tolerate up to 1% delta + double minimumRelativeDelta = 0.01; + + // The larger of the two deltas is the one we use + double maxAllowedDelta = Math.max(minimumAbsoluteDelta, minimumRelativeDelta); + + boolean success = delta < maxAllowedDelta; + + printResultRow( + status(success, currentAttempt, totalAttempts), + c.getSimpleName(), + String.valueOf(predictedUsage), + String.format("%.3f", memoryAllocatedPerInstance)); + + // A best-effort attempt to minimize the chance of needing to GC in the middle of the next measurement run... + allocations = null; + System.gc(); + + if (success) { + break; // No more attempts needed if the allocation measurement and all assertions succeeded + } else if (currentAttempt == totalAttempts) { + fail( + "Class " + c.getSimpleName() + " has a memoryAllocatedPerInstance (" + memoryAllocatedPerInstance + + ") which is too far from the predictedUsage (" + predictedUsage + "); delta: " + + String.format("%.3f", delta) + "; maxAllowedDelta: " + String.format("%.3f", maxAllowedDelta) + + ". No more attempts left."); + } + } + } + + /** Different algo than the main code because why not? It should be equivalent... */ + protected static int roundUpToNearestAlignment(int size) { + double numberOfAlignmentWindowsFittingWithinTheSize = (double) size / ALIGNMENT_SIZE; + double roundedUp = Math.ceil(numberOfAlignmentWindowsFittingWithinTheSize); + int finalSize = (int) roundedUp * ALIGNMENT_SIZE; + return finalSize; + } + + private static long getCurrentlyAllocatedMemory() { + System.gc(); + return RUNTIME.maxMemory() - RUNTIME.freeMemory(); + } + + @BeforeClass + public void printEnvironmentInfo() { + LOGGER.info("Java major version: " + JAVA_MAJOR_VERSION); + LOGGER.info("Alignment size: " + ALIGNMENT_SIZE); + LOGGER.info("Object header size: " + OBJECT_HEADER_SIZE); + LOGGER.info("Array header size: " + ARRAY_HEADER_SIZE); + LOGGER.info("Pointer size: " + POINTER_SIZE); + } + + protected void printHeader(String... headerCells) { + printResultSeparatorLine(); + printResultRow(headerCells); + printResultSeparatorLine(); + } + + protected void printResultSeparatorLine() { + StringBuilder sb = new StringBuilder(); + sb.append(" "); + for (int i = 0; i < this.resultRowCellLengths.length; i++) { + sb.append("+-"); + for (int j = 0; j < this.resultRowCellLengths[i] + 1; j++) { + sb.append('-'); + } + } + sb.append("+"); + LOGGER.info(sb.toString()); + } + + private static String status(boolean success, int currentAttempt, int totalAttempts) { + String symbol = new String(Character.toChars(success ? 0x2705 : 0x274C)); + if (totalAttempts == 1) { + return symbol; + } + return symbol + " " + currentAttempt + "/" + totalAttempts; + } + + private void printResultRow(String... cells) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < cells.length; i++) { + sb.append(" | "); + String cell = cells[i]; + sb.append(cell); + + int remainder = this.resultRowCellLengths[i] - cell.length(); + for (Character character: cell.toCharArray()) { + if (Character.UnicodeBlock.of(character) != Character.UnicodeBlock.BASIC_LATIN) { + // Emoticons take two characters' width + remainder--; + } + } + + for (int j = 0; j < remainder; j++) { + sb.append(' '); + } + } + sb.append(" |"); + LOGGER.info(sb.toString()); + } +} diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/memory/InstanceSizeEstimatorTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/memory/InstanceSizeEstimatorTest.java new file mode 100644 index 0000000000..095390ca5b --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/memory/InstanceSizeEstimatorTest.java @@ -0,0 +1,152 @@ +package com.linkedin.venice.memory; + +import static com.linkedin.venice.kafka.protocol.enums.MessageType.PUT; + +import com.linkedin.davinci.kafka.consumer.LeaderProducedRecordContext; +import com.linkedin.davinci.kafka.consumer.SBSQueueNodeFactory; +import com.linkedin.venice.kafka.protocol.GUID; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.kafka.protocol.LeaderMetadata; +import com.linkedin.venice.kafka.protocol.ProducerMetadata; +import com.linkedin.venice.kafka.protocol.Put; +import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.pubsub.ImmutablePubSubMessage; +import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; +import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.writer.VeniceWriter; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Supplier; +import org.testng.annotations.Test; + + +public class InstanceSizeEstimatorTest extends HeapSizeEstimatorTest { + public InstanceSizeEstimatorTest() { + super(LeaderProducedRecordContext.class); + } + + @Test + public void testInstanceMeasurement() { + printHeader(TestMethodology.EMPIRICAL_MEASUREMENT.resultsTableHeader); + + // Try various array sizes to take alignment into account. + List> kafkaKeySuppliers = new ArrayList<>(); + kafkaKeySuppliers.add(() -> new KafkaKey(PUT, new byte[0])); + kafkaKeySuppliers.add(() -> new KafkaKey(PUT, new byte[2])); + kafkaKeySuppliers.add(() -> new KafkaKey(PUT, new byte[4])); + kafkaKeySuppliers.add(() -> new KafkaKey(PUT, new byte[6])); + kafkaKeySuppliers.add(() -> new KafkaKey(PUT, new byte[8])); + kafkaKeySuppliers.add(() -> new KafkaKey(PUT, new byte[10])); + + for (Supplier kafkaKeySupplier: kafkaKeySuppliers) { + empiricalInstanceMeasurement(KafkaKey.class, kafkaKeySupplier); + } + + Supplier producerMetadataSupplier = () -> new ProducerMetadata(new GUID(), 0, 0, 0L, 0L); + empiricalInstanceMeasurement(ProducerMetadata.class, producerMetadataSupplier); + + Supplier rtPutSupplier = () -> new Put(ByteBuffer.allocate(10), 1, -1, null); + Supplier vtPutSupplier = () -> { + byte[] rawKafkaValue = new byte[50]; + return new Put(ByteBuffer.wrap(rawKafkaValue, 10, 10), 1, 1, ByteBuffer.wrap(rawKafkaValue, 25, 10)); + }; + List> putSuppliers = new ArrayList<>(); + putSuppliers.add(rtPutSupplier); + putSuppliers.add(vtPutSupplier); + for (Supplier putSupplier: putSuppliers) { + empiricalInstanceMeasurement(Put.class, putSupplier); + } + + PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); + /** The {@link PubSubTopicPartition} is supposed to be a shared instance, but it cannot be null. */ + PubSubTopicPartition pubSubTopicPartition = + new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("topic"), 0); + + VeniceWriter.DefaultLeaderMetadata defaultLeaderMetadata = new VeniceWriter.DefaultLeaderMetadata("blah"); + + List> kmeSuppliers = new ArrayList<>(); + Supplier rtKmeSupplier = () -> new KafkaMessageEnvelope( + // What a message in the RT topic might look like + PUT.getValue(), + producerMetadataSupplier.get(), + rtPutSupplier.get(), + // The (improperly-named) "leader" metadata footer is always populated, but in this path it points to a + // static instance. + defaultLeaderMetadata); + Supplier vtKmeSupplier = () -> new KafkaMessageEnvelope( + PUT.getValue(), + producerMetadataSupplier.get(), + vtPutSupplier.get(), + new LeaderMetadata( + null, // shared instance + 0L, + 0)); + kmeSuppliers.add(rtKmeSupplier); + kmeSuppliers.add(vtKmeSupplier); + // TODO: Add updates, deletes... + + for (Supplier kmeSupplier: kmeSuppliers) { + empiricalInstanceMeasurement(KafkaMessageEnvelope.class, kmeSupplier); + } + + BiFunction, Supplier, PubSubMessage> psmProvider = + (kafkaKeySupplier, kmeSupplier) -> new ImmutablePubSubMessage<>( + kafkaKeySupplier.get(), + kmeSupplier.get(), + pubSubTopicPartition, + 0, + 0, + 0); + + for (Supplier kafkaKeySupplier: kafkaKeySuppliers) { + for (Supplier kmeSupplier: kmeSuppliers) { + empiricalInstanceMeasurement( + ImmutablePubSubMessage.class, + () -> psmProvider.apply(kafkaKeySupplier, kmeSupplier)); + } + } + + for (Supplier kafkaKeySupplier: kafkaKeySuppliers) { + for (Supplier kmeSupplier: kmeSuppliers) { + empiricalInstanceMeasurement( + SBSQueueNodeFactory.queueNodeClass(), + () -> SBSQueueNodeFactory.queueNode(psmProvider.apply(kafkaKeySupplier, kmeSupplier), null, null, 0)); + } + } + + int kafkaClusterId = 0; + Supplier leaderProducedRecordContextSupplierForPut = + () -> LeaderProducedRecordContext.newPutRecord(kafkaClusterId, 0, new byte[10], vtPutSupplier.get()); + List> leaderProducedRecordContextSuppliers = new ArrayList<>(); + leaderProducedRecordContextSuppliers.add(leaderProducedRecordContextSupplierForPut); + + for (Supplier leaderProducedRecordContextSupplier: leaderProducedRecordContextSuppliers) { + empiricalInstanceMeasurement(LeaderProducedRecordContext.class, leaderProducedRecordContextSupplier); + } + + for (Supplier kafkaKeySupplier: kafkaKeySuppliers) { + for (Supplier leaderProducedRecordContextSupplier: leaderProducedRecordContextSuppliers) { + empiricalInstanceMeasurement( + SBSQueueNodeFactory.leaderQueueNodeClass(), + () -> SBSQueueNodeFactory.leaderQueueNode( + psmProvider.apply(kafkaKeySupplier, vtKmeSupplier), + null, + null, + 0, + leaderProducedRecordContextSupplier.get())); + } + } + + printResultSeparatorLine(); + } + + private void empiricalInstanceMeasurement(Class c, Supplier constructor) { + Object o = constructor.get(); + int expectedSize = InstanceSizeEstimator.getObjectSize(o); + empiricalMeasurement(c, expectedSize, constructor); + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/PubSubProducerCallbackSimpleImpl.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/PubSubProducerCallbackSimpleImpl.java similarity index 100% rename from internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/PubSubProducerCallbackSimpleImpl.java rename to internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/PubSubProducerCallbackSimpleImpl.java diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaUtilsTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaUtilsTest.java index a9f3ccd86b..ad19ceafbb 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaUtilsTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaUtilsTest.java @@ -2,12 +2,16 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; +import com.linkedin.venice.pubsub.api.EmptyPubSubMessageHeaders; import com.linkedin.venice.pubsub.api.PubSubMessageHeader; import com.linkedin.venice.pubsub.api.PubSubMessageHeaders; import java.util.LinkedHashMap; import java.util.Map; +import java.util.function.Supplier; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeaders; import org.testng.annotations.Test; @@ -15,10 +19,22 @@ public class ApacheKafkaUtilsTest { @Test - public void testConvertToKafkaSpecificHeadersWhenPubsubMessageHeadersIsNull() { - RecordHeaders recordHeaders = ApacheKafkaUtils.convertToKafkaSpecificHeaders(null); - assertNotNull(recordHeaders); - assertEquals(recordHeaders.toArray().length, 0); + public void testConvertToKafkaSpecificHeadersWhenPubsubMessageHeadersIsNullOrEmpty() { + Supplier[] suppliers = + new Supplier[] { () -> null, () -> new PubSubMessageHeaders(), () -> EmptyPubSubMessageHeaders.SINGLETON }; + for (Supplier supplier: suppliers) { + RecordHeaders recordHeaders = ApacheKafkaUtils.convertToKafkaSpecificHeaders(supplier.get()); + assertNotNull(recordHeaders); + assertEquals(recordHeaders.toArray().length, 0); + assertThrows(() -> recordHeaders.add("foo", "bar".getBytes())); + + RecordHeaders recordHeaders2 = ApacheKafkaUtils.convertToKafkaSpecificHeaders(supplier.get()); + assertNotNull(recordHeaders2); + assertEquals(recordHeaders2.toArray().length, 0); + assertThrows(() -> recordHeaders2.add("foo", "bar".getBytes())); + + assertSame(recordHeaders, recordHeaders2); + } } @Test diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubProducerAdapterConcurrentDelegatorTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubProducerAdapterConcurrentDelegatorTest.java index b9ca3f20df..07f84d26ce 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubProducerAdapterConcurrentDelegatorTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubProducerAdapterConcurrentDelegatorTest.java @@ -85,7 +85,7 @@ public void testProducerQueueNode() { new PubSubMessageHeaders(), null, null); - assertTrue(node.getSize() > 0); + assertTrue(node.getHeapSize() > 0); } @Test diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/utils/DictionaryUtilsTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/utils/DictionaryUtilsTest.java index 55b2846a9f..2f587671cd 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/utils/DictionaryUtilsTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/utils/DictionaryUtilsTest.java @@ -46,7 +46,7 @@ public void testGetDictionary() { PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(topic, 0); doReturn(topic).when(pubSubTopicRepository).getTopic(topic.getName()); - KafkaKey controlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, null); + KafkaKey controlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, new byte[0]); StartOfPush startOfPush = new StartOfPush(); startOfPush.compressionStrategy = CompressionStrategy.ZSTD_WITH_DICT.getValue(); startOfPush.compressionDictionary = ByteBuffer.wrap(dictionaryToSend); @@ -77,7 +77,7 @@ public void testGetDictionaryReturnsNullWhenNoDictionary() { PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(topic, 0); doReturn(topic).when(pubSubTopicRepository).getTopic(topic.getName()); - KafkaKey controlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, null); + KafkaKey controlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, new byte[0]); StartOfPush startOfPush = new StartOfPush(); ControlMessage sopCM = new ControlMessage(); @@ -134,7 +134,7 @@ public void testGetDictionaryWaitsTillTopicHasRecords() { PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(topic, 0); doReturn(topic).when(pubSubTopicRepository).getTopic(topic.getName()); - KafkaKey controlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, null); + KafkaKey controlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, new byte[0]); StartOfPush startOfPush = new StartOfPush(); startOfPush.compressionStrategy = CompressionStrategy.ZSTD_WITH_DICT.getValue(); startOfPush.compressionDictionary = ByteBuffer.wrap(dictionaryToSend); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/utils/collections/MemoryBoundBlockingQueueTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/utils/collections/MemoryBoundBlockingQueueTest.java index a390dbfdba..e4286a2abb 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/utils/collections/MemoryBoundBlockingQueueTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/utils/collections/MemoryBoundBlockingQueueTest.java @@ -1,6 +1,6 @@ package com.linkedin.venice.utils.collections; -import com.linkedin.venice.common.Measurable; +import com.linkedin.venice.memory.Measurable; import com.linkedin.venice.utils.TestUtils; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -13,7 +13,7 @@ private static class MeasurableObject implements Measurable { public static final int SIZE = 10; @Override - public int getSize() { + public int getHeapSize() { return SIZE; } } @@ -23,7 +23,7 @@ public void testPut() throws InterruptedException { int memoryCap = 5000; MemoryBoundBlockingQueue queue = new MemoryBoundBlockingQueue<>(memoryCap, 1000); int objectCntAtMost = - memoryCap / (MemoryBoundBlockingQueue.LINKED_QUEUE_NODE_OVERHEAD_IN_BYTE + MeasurableObject.SIZE); + memoryCap / (MemoryBoundBlockingQueue.LINKED_LIST_NODE_SHALLOW_OVERHEAD + MeasurableObject.SIZE); Thread t = new Thread(() -> { while (true) { try { @@ -48,7 +48,7 @@ public void testTake() throws InterruptedException { int memoryCap = 5000; MemoryBoundBlockingQueue queue = new MemoryBoundBlockingQueue<>(memoryCap, 1000); int objectCntAtMost = - memoryCap / (MemoryBoundBlockingQueue.LINKED_QUEUE_NODE_OVERHEAD_IN_BYTE + MeasurableObject.SIZE); + memoryCap / (MemoryBoundBlockingQueue.LINKED_LIST_NODE_SHALLOW_OVERHEAD + MeasurableObject.SIZE); for (int i = 0; i < objectCntAtMost; ++i) { queue.put(new MeasurableObject()); } @@ -81,7 +81,7 @@ public void testThrottling() throws InterruptedException { int notifyDelta = 1000; MemoryBoundBlockingQueue queue = new MemoryBoundBlockingQueue<>(memoryCap, notifyDelta); int objectCntAtMost = - memoryCap / (MemoryBoundBlockingQueue.LINKED_QUEUE_NODE_OVERHEAD_IN_BYTE + MeasurableObject.SIZE); + memoryCap / (MemoryBoundBlockingQueue.LINKED_LIST_NODE_SHALLOW_OVERHEAD + MeasurableObject.SIZE); Thread t = new Thread(() -> { while (true) { try { @@ -102,7 +102,7 @@ public void testThrottling() throws InterruptedException { int previousQueueSize = queue.size(); // Here we need to take out some objects to allow more put double objectCntTakenAtLeast = Math.ceil( - (double) notifyDelta / (MemoryBoundBlockingQueue.LINKED_QUEUE_NODE_OVERHEAD_IN_BYTE + MeasurableObject.SIZE)); + (double) notifyDelta / (MemoryBoundBlockingQueue.LINKED_LIST_NODE_SHALLOW_OVERHEAD + MeasurableObject.SIZE)); for (int i = 1; i < objectCntTakenAtLeast; ++i) { queue.take(); Assert.assertEquals(queue.size(), previousQueueSize - 1); diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/PubSubHelper.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/PubSubHelper.java index ba268eb134..1fe429d10c 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/PubSubHelper.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/PubSubHelper.java @@ -168,5 +168,10 @@ public long getTimestampBeforeProduce() { public long getTimestampAfterProduce() { return timestampAfterProduce; } + + @Override + public int getHeapSize() { + throw new UnsupportedOperationException("getHeapSize is not supported on " + this.getClass().getSimpleName()); + } } }