diff --git a/build.gradle b/build.gradle index 9f4fa4e3fa..5c91d3b508 100644 --- a/build.gradle +++ b/build.gradle @@ -369,7 +369,8 @@ subprojects { } classes = classDirs.asFileTree.matching { exclude generatedClasses } auxClassPaths += classDirs.asFileTree.matching { include generatedClasses }.each { - println "Excluding generated class ${project.relativePath(it)}" + // Muted to reduce noise, but can be uncommented to debug exclusions + // println "Excluding generated class ${project.relativePath(it)}" } } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ImmutableChangeCapturePubSubMessage.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ImmutableChangeCapturePubSubMessage.java index e6122df533..75bed9b6e9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ImmutableChangeCapturePubSubMessage.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ImmutableChangeCapturePubSubMessage.java @@ -75,4 +75,9 @@ public String toString() { return "PubSubMessage{" + topicPartition + ", offset=" + offset + ", timestamp=" + timestamp + ", isEndOfBootstrap=" + isEndOfBootstrap + '}'; } + + @Override + public int getHeapSize() { + throw new UnsupportedOperationException("getHeapSize is not supported on " + this.getClass().getSimpleName()); + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 76042645a2..a09a201c58 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -2154,7 +2154,7 @@ protected void getAndUpdateLeaderCompletedState( && Arrays.equals(kafkaKey.getKey(), KafkaKey.HEART_BEAT.getKey())) { LeaderCompleteState oldState = partitionConsumptionState.getLeaderCompleteState(); LeaderCompleteState newState = oldState; - for (PubSubMessageHeader header: pubSubMessageHeaders.toList()) { + for (PubSubMessageHeader header: pubSubMessageHeaders) { if (header.key().equals(VENICE_LEADER_COMPLETION_STATE_HEADER)) { newState = LeaderCompleteState.valueOf(header.value()[0]); partitionConsumptionState diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducedRecordContext.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducedRecordContext.java index 4cff79e933..75ce762780 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducedRecordContext.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderProducedRecordContext.java @@ -3,11 +3,14 @@ import static com.linkedin.venice.kafka.protocol.enums.MessageType.CONTROL_MESSAGE; import static com.linkedin.venice.kafka.protocol.enums.MessageType.DELETE; import static com.linkedin.venice.kafka.protocol.enums.MessageType.PUT; +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; +import static com.linkedin.venice.memory.InstanceSizeEstimator.getSize; import com.linkedin.venice.kafka.protocol.ControlMessage; import com.linkedin.venice.kafka.protocol.Delete; import com.linkedin.venice.kafka.protocol.Put; import com.linkedin.venice.kafka.protocol.enums.MessageType; +import com.linkedin.venice.memory.Measurable; import java.util.concurrent.CompletableFuture; @@ -25,7 +28,9 @@ * drainer thread completes the persistedToDBFuture. */ -public class LeaderProducedRecordContext { +public class LeaderProducedRecordContext implements Measurable { + private static final int PARTIAL_CLASS_OVERHEAD = + getClassOverhead(LeaderProducedRecordContext.class) + getClassOverhead(CompletableFuture.class); private static final int NO_UPSTREAM = -1; /** * Kafka cluster ID where the source kafka consumer record was consumed from. @@ -235,4 +240,26 @@ private static void checkConsumedOffsetParam(long consumedOffset) { throw new IllegalArgumentException("consumedOffset cannot be negative"); } } + + @Override + public int getHeapSize() { + int size = PARTIAL_CLASS_OVERHEAD + getSize(this.keyBytes); + switch (this.messageType) { + case PUT: + size += getSize((Put) this.valueUnion); + break; + case CONTROL_MESSAGE: + size += getSize((ControlMessage) this.valueUnion); + break; + default: + /** + * Only the above two cases contribute any size. + * + * {@link DELETE} contributes nothing, and {@link com.linkedin.venice.kafka.protocol.enums.MessageType.UPDATE} + * should never happen. + */ + break; + } + return size; + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreBufferService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreBufferService.java index 5e26b3a235..8f47975866 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreBufferService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreBufferService.java @@ -1,16 +1,17 @@ package com.linkedin.davinci.kafka.consumer; -import static com.linkedin.venice.utils.ProtocolUtils.getEstimateOfMessageEnvelopeSizeOnHeap; +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; import static java.util.Collections.reverseOrder; import static java.util.Comparator.comparing; import static java.util.stream.Collectors.toList; import com.linkedin.davinci.stats.StoreBufferServiceStats; import com.linkedin.davinci.utils.LockAssistedCompletableFuture; -import com.linkedin.venice.common.Measurable; import com.linkedin.venice.exceptions.VeniceChecksumException; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.memory.ClassSizeEstimator; +import com.linkedin.venice.memory.Measurable; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.pubsub.api.PubSubMessage; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; @@ -71,42 +72,53 @@ public StoreBufferService( boolean queueLeaderWrites, MetricsRepository metricsRepository, boolean sorted) { - this.drainerNum = drainerNum; - this.blockingQueueArr = new ArrayList<>(); - this.bufferCapacityPerDrainer = bufferCapacityPerDrainer; - for (int cur = 0; cur < drainerNum; ++cur) { - this.blockingQueueArr.add(new MemoryBoundBlockingQueue<>(bufferCapacityPerDrainer, bufferNotifyDelta)); - } - this.isSorted = sorted; - this.leaderRecordHandler = queueLeaderWrites ? this::queueLeaderRecord : StoreBufferService::processRecord; - String metricNamePrefix = sorted ? "StoreBufferServiceSorted" : "StoreBufferServiceUnsorted"; - this.storeBufferServiceStats = new StoreBufferServiceStats( - metricsRepository, - metricNamePrefix, - this::getTotalMemoryUsage, - this::getTotalRemainingMemory, - this::getMaxMemoryUsagePerDrainer, - this::getMinMemoryUsagePerDrainer); + this(drainerNum, bufferCapacityPerDrainer, bufferNotifyDelta, queueLeaderWrites, null, metricsRepository, sorted); } /** - * Constructor for testing + * Package-private constructor for testing */ - public StoreBufferService( + StoreBufferService( int drainerNum, long bufferCapacityPerDrainer, long bufferNotifyDelta, boolean queueLeaderWrites, StoreBufferServiceStats stats) { + this(drainerNum, bufferCapacityPerDrainer, bufferNotifyDelta, queueLeaderWrites, stats, null, true); + } + + /** + * Shared code for the main and test constructors. + * + * N.B.: Either {@param stats} or {@param metricsRepository} should be null, but not both. If neither are null, then + * we default to the main code's expected path, meaning that the metric repo will be used to construct a + * {@link StoreBufferServiceStats} instance, and the passed in stats object will be ignored. + */ + private StoreBufferService( + int drainerNum, + long bufferCapacityPerDrainer, + long bufferNotifyDelta, + boolean queueLeaderWrites, + StoreBufferServiceStats stats, + MetricsRepository metricsRepository, + boolean sorted) { this.drainerNum = drainerNum; this.blockingQueueArr = new ArrayList<>(); this.bufferCapacityPerDrainer = bufferCapacityPerDrainer; for (int cur = 0; cur < drainerNum; ++cur) { this.blockingQueueArr.add(new MemoryBoundBlockingQueue<>(bufferCapacityPerDrainer, bufferNotifyDelta)); } + this.isSorted = sorted; this.leaderRecordHandler = queueLeaderWrites ? this::queueLeaderRecord : StoreBufferService::processRecord; - this.storeBufferServiceStats = stats; - this.isSorted = true; + this.storeBufferServiceStats = metricsRepository == null + ? Objects.requireNonNull(stats) + : new StoreBufferServiceStats( + Objects.requireNonNull(metricsRepository), + sorted ? "StoreBufferServiceSorted" : "StoreBufferServiceUnsorted", + this::getTotalMemoryUsage, + this::getTotalRemainingMemory, + this::getMaxMemoryUsagePerDrainer, + this::getMinMemoryUsagePerDrainer); } protected MemoryBoundBlockingQueue getDrainerForConsumerRecord( @@ -378,11 +390,8 @@ public long getMinMemoryUsagePerDrainer() { /** * Queue node type in {@link BlockingQueue} of each drainer thread. */ - private static class QueueNode implements Measurable { - /** - * Considering the overhead of {@link PubSubMessage} and its internal structures. - */ - private static final int QUEUE_NODE_OVERHEAD_IN_BYTE = 256; + static class QueueNode implements Measurable { + private static final int SHALLOW_CLASS_OVERHEAD = ClassSizeEstimator.getClassOverhead(QueueNode.class); private final PubSubMessage consumerRecord; private final StoreIngestionTask ingestionTask; private final String kafkaUrl; @@ -447,15 +456,14 @@ public int hashCode() { return consumerRecord.hashCode(); } + protected int getBaseClassOverhead() { + return SHALLOW_CLASS_OVERHEAD; + } + @Override - public int getSize() { - // For FakePubSubMessage, the key and the value are null. - if (consumerRecord instanceof FakePubSubMessage) { - return QUEUE_NODE_OVERHEAD_IN_BYTE; - } - // N.B.: This is just an estimate. TODO: Consider if it is really useful, and whether to get rid of it. - return this.consumerRecord.getKey().getEstimatedObjectSizeOnHeap() - + getEstimateOfMessageEnvelopeSizeOnHeap(this.consumerRecord.getValue()) + QUEUE_NODE_OVERHEAD_IN_BYTE; + public int getHeapSize() { + /** The other non-primitive fields point to shared instances and are therefore ignored. */ + return getBaseClassOverhead() + consumerRecord.getHeapSize(); } @Override @@ -465,6 +473,13 @@ public String toString() { } private static class FollowerQueueNode extends QueueNode { + /** + * N.B.: We don't want to recurse fully into the {@link CompletableFuture}, but we do want to take into account an + * "empty" one. + */ + private static final int PARTIAL_CLASS_OVERHEAD = + getClassOverhead(FollowerQueueNode.class) + getClassOverhead(CompletableFuture.class); + private final CompletableFuture queuedRecordPersistedFuture; public FollowerQueueNode( @@ -491,9 +506,16 @@ public int hashCode() { public boolean equals(Object o) { return super.equals(o); } + + @Override + protected int getBaseClassOverhead() { + return PARTIAL_CLASS_OVERHEAD; + } } - private static class LeaderQueueNode extends QueueNode { + static class LeaderQueueNode extends QueueNode { + private static final int SHALLOW_CLASS_OVERHEAD = ClassSizeEstimator.getClassOverhead(LeaderQueueNode.class); + private final LeaderProducedRecordContext leaderProducedRecordContext; public LeaderQueueNode( @@ -520,9 +542,21 @@ public int hashCode() { public boolean equals(Object o) { return super.equals(o); } + + @Override + protected int getBaseClassOverhead() { + return SHALLOW_CLASS_OVERHEAD + leaderProducedRecordContext.getHeapSize(); + } } private static class CommandQueueNode extends QueueNode { + /** + * N.B.: We don't want to recurse fully into the {@link CompletableFuture}, but we do want to take into account an + * "empty" one. + */ + private static final int PARTIAL_CLASS_OVERHEAD = + getClassOverhead(CommandQueueNode.class) + getClassOverhead(LockAssistedCompletableFuture.class); + enum CommandType { // only supports SYNC_OFFSET command today. SYNC_OFFSET @@ -589,6 +623,10 @@ public int hashCode() { public boolean equals(Object o) { return super.equals(o); } + + protected int getBaseClassOverhead() { + return PARTIAL_CLASS_OVERHEAD; + } } /** @@ -714,6 +752,7 @@ public void run() { } private static class FakePubSubMessage implements PubSubMessage { + private static final int SHALLOW_CLASS_OVERHEAD = ClassSizeEstimator.getClassOverhead(FakePubSubMessage.class); private final PubSubTopicPartition topicPartition; FakePubSubMessage(PubSubTopicPartition topicPartition) { @@ -754,5 +793,11 @@ public int getPayloadSize() { public boolean isEndOfBootstrap() { return false; } + + @Override + public int getHeapSize() { + /** We assume that {@link #topicPartition} is a singleton instance, and therefore we're not counting it. */ + return SHALLOW_CLASS_OVERHEAD; + } } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManagerTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManagerTest.java index 08ebe35dd4..b30cdd2f4c 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManagerTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/BlobSnapshotManagerTest.java @@ -10,6 +10,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertTrue; import com.linkedin.davinci.storage.StorageEngineRepository; import com.linkedin.davinci.storage.StorageMetadataService; @@ -19,12 +20,14 @@ import com.linkedin.venice.meta.ReadOnlyStoreRepository; import com.linkedin.venice.meta.Store; import com.linkedin.venice.store.rocksdb.RocksDBUtils; +import com.linkedin.venice.utils.Time; import com.linkedin.venice.utils.Utils; import java.io.File; import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.mockito.MockedStatic; import org.mockito.Mockito; @@ -36,6 +39,7 @@ public class BlobSnapshotManagerTest { + private static final int TIMEOUT = 30 * Time.MS_PER_SECOND; private static final String STORE_NAME = "test-store"; private static final int VERSION_ID = 1; private static final String TOPIC_NAME = STORE_NAME + "_v" + VERSION_ID; @@ -51,7 +55,7 @@ public class BlobSnapshotManagerTest { private static final BlobTransferPayload blobTransferPayload = new BlobTransferPayload(BASE_PATH, STORE_NAME, VERSION_ID, PARTITION_ID); - @Test + @Test(timeOut = TIMEOUT) public void testHybridSnapshot() { AbstractStorageEngine storageEngine = Mockito.mock(AbstractStorageEngine.class); Mockito.doReturn(storageEngine).when(storageEngineRepository).getLocalStorageEngine(TOPIC_NAME); @@ -76,7 +80,7 @@ public void testHybridSnapshot() { Assert.assertEquals(actualBlobTransferPartitionMetadata, blobTransferPartitionMetadata); } - @Test + @Test(timeOut = TIMEOUT) public void testSameSnapshotWhenConcurrentUsersNotExceedMaxAllowedUsers() { Store mockStore = mock(Store.class); @@ -105,7 +109,7 @@ public void testSameSnapshotWhenConcurrentUsersNotExceedMaxAllowedUsers() { Assert.assertEquals(actualBlobTransferPartitionMetadata, blobTransferPartitionMetadata); } - @Test + @Test(timeOut = TIMEOUT) public void testSameSnapshotWhenConcurrentUsersExceedsMaxAllowedUsers() { Store mockStore = mock(Store.class); @@ -145,7 +149,7 @@ public void testSameSnapshotWhenConcurrentUsersExceedsMaxAllowedUsers() { BlobSnapshotManager.DEFAULT_MAX_CONCURRENT_USERS); } - @Test + @Test(timeOut = TIMEOUT) public void testTwoRequestUsingSameOffset() { // Prepare Store mockStore = mock(Store.class); @@ -180,8 +184,8 @@ public void testTwoRequestUsingSameOffset() { blobTransferPartitionMetadata); } - @Test - public void testMultipleThreads() { + @Test(timeOut = TIMEOUT) + public void testMultipleThreads() throws InterruptedException { final int numberOfThreads = 2; final ExecutorService asyncExecutor = Executors.newFixedThreadPool(numberOfThreads); final CountDownLatch latch = new CountDownLatch(numberOfThreads); @@ -218,10 +222,12 @@ public void testMultipleThreads() { Assert.assertEquals(e.getMessage(), errorMessage); } + assertTrue(latch.await(TIMEOUT / 2, TimeUnit.MILLISECONDS)); + Assert.assertEquals(blobSnapshotManager.getConcurrentSnapshotUsers(TOPIC_NAME, PARTITION_ID), 0); } - @Test + @Test(timeOut = TIMEOUT) public void testCreateSnapshotForBatch() throws RocksDBException { try (MockedStatic checkpointMockedStatic = Mockito.mockStatic(Checkpoint.class)) { try (MockedStatic fileUtilsMockedStatic = Mockito.mockStatic(FileUtils.class)) { diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java index 9be396e7c2..1ce9115ea8 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java @@ -537,7 +537,7 @@ private PubSubMessage constructVersionSwap PubSubTopic newTopic, int partition, List localHighWatermarks) { - KafkaKey kafkaKey = new KafkaKey(MessageType.CONTROL_MESSAGE, null); + KafkaKey kafkaKey = new KafkaKey(MessageType.CONTROL_MESSAGE, new byte[0]); VersionSwap versionSwapMessage = new VersionSwap(); versionSwapMessage.oldServingVersionTopic = oldTopic.getName(); versionSwapMessage.newServingVersionTopic = newTopic.getName(); @@ -616,7 +616,7 @@ private PubSubMessage constructEndOfPushMe PubSubTopic versionTopic, int partition, Long offset) { - KafkaKey kafkaKey = new KafkaKey(MessageType.CONTROL_MESSAGE, null); + KafkaKey kafkaKey = new KafkaKey(MessageType.CONTROL_MESSAGE, new byte[0]); EndOfPush endOfPush = new EndOfPush(); KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope(); ProducerMetadata producerMetadata = new ProducerMetadata(); @@ -633,7 +633,7 @@ private PubSubMessage constructEndOfPushMe private PubSubMessage constructStartOfPushMessage( PubSubTopic versionTopic, int partition) { - KafkaKey kafkaKey = new KafkaKey(MessageType.CONTROL_MESSAGE, null); + KafkaKey kafkaKey = new KafkaKey(MessageType.CONTROL_MESSAGE, new byte[0]); StartOfPush startOfPush = new StartOfPush(); startOfPush.compressionStrategy = CompressionStrategy.NO_OP.getValue(); KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope(); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreBufferServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreBufferServiceTest.java index d8d2f383c2..0d9bb3e4dd 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreBufferServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreBufferServiceTest.java @@ -39,7 +39,7 @@ public class StoreBufferServiceTest { private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); - private final KafkaKey key = new KafkaKey(MessageType.PUT, null); + private final KafkaKey key = new KafkaKey(MessageType.PUT, new byte[0]); private final Put put = new Put(ByteBuffer.allocate(0), 0, 0, ByteBuffer.allocate(0)); private final KafkaMessageEnvelope value = new KafkaMessageEnvelope(MessageType.PUT.getValue(), new ProducerMetadata(), put, null); diff --git a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminToolConsumption.java b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminToolConsumption.java index bdf2258950..b4c47e5568 100644 --- a/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminToolConsumption.java +++ b/clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestAdminToolConsumption.java @@ -156,7 +156,7 @@ public void testAdminToolConsumption() { new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, 0, 0, 20); PubSubMessage pubSubMessage2 = new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope2, pubSubTopicPartition, 1, 0, 10); - KafkaKey kafkaControlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, null); + KafkaKey kafkaControlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, new byte[0]); EndOfPush endOfPush = new EndOfPush(); KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope(); kafkaMessageEnvelope.messageType = MessageType.CONTROL_MESSAGE.getValue(); diff --git a/gradle/spotbugs/exclude.xml b/gradle/spotbugs/exclude.xml index 40d69bb0be..3725224437 100644 --- a/gradle/spotbugs/exclude.xml +++ b/gradle/spotbugs/exclude.xml @@ -55,12 +55,27 @@ + + + + + + + + + + + + + + + @@ -189,6 +204,12 @@ + + + + + + @@ -272,6 +293,12 @@ + + + + + + @@ -297,6 +324,10 @@ + + + + diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/common/Measurable.java b/internal/venice-common/src/main/java/com/linkedin/venice/common/Measurable.java deleted file mode 100644 index d76f582722..0000000000 --- a/internal/venice-common/src/main/java/com/linkedin/venice/common/Measurable.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.linkedin.venice.common; - -public interface Measurable { - int getSize(); -} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/ControlMessageType.java b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/ControlMessageType.java index 9170b59b3a..a1a3e33566 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/ControlMessageType.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/protocol/enums/ControlMessageType.java @@ -1,19 +1,21 @@ package com.linkedin.venice.kafka.protocol.enums; -import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.exceptions.VeniceMessageException; import com.linkedin.venice.kafka.protocol.ControlMessage; import com.linkedin.venice.kafka.protocol.EndOfIncrementalPush; import com.linkedin.venice.kafka.protocol.EndOfPush; import com.linkedin.venice.kafka.protocol.EndOfSegment; +import com.linkedin.venice.kafka.protocol.StartOfBufferReplay; import com.linkedin.venice.kafka.protocol.StartOfIncrementalPush; import com.linkedin.venice.kafka.protocol.StartOfPush; import com.linkedin.venice.kafka.protocol.StartOfSegment; import com.linkedin.venice.kafka.protocol.TopicSwitch; import com.linkedin.venice.kafka.protocol.VersionSwap; +import com.linkedin.venice.memory.ClassSizeEstimator; import com.linkedin.venice.utils.EnumUtils; import com.linkedin.venice.utils.VeniceEnumValue; import java.util.List; +import java.util.function.Supplier; /** @@ -24,15 +26,35 @@ * not support evolution (i.e.: adding values) properly. */ public enum ControlMessageType implements VeniceEnumValue { - START_OF_PUSH(0), END_OF_PUSH(1), START_OF_SEGMENT(2), END_OF_SEGMENT(3), @Deprecated - START_OF_BUFFER_REPLAY(4), START_OF_INCREMENTAL_PUSH(5), END_OF_INCREMENTAL_PUSH(6), TOPIC_SWITCH(7), VERSION_SWAP(8); + START_OF_PUSH(0, () -> new StartOfPush()), + + END_OF_PUSH(1, () -> new EndOfPush()), + + START_OF_SEGMENT(2, () -> new StartOfSegment()), + + END_OF_SEGMENT(3, () -> new EndOfSegment()), + + @Deprecated + START_OF_BUFFER_REPLAY(4, () -> new StartOfBufferReplay()), + + START_OF_INCREMENTAL_PUSH(5, () -> new StartOfIncrementalPush()), + + END_OF_INCREMENTAL_PUSH(6, () -> new EndOfIncrementalPush()), + + TOPIC_SWITCH(7, () -> new TopicSwitch()), + + VERSION_SWAP(8, () -> new VersionSwap()); /** The value is the byte used on the wire format */ private final int value; + private final Supplier constructor; + private final int shallowClassOverhead; private static final List TYPES = EnumUtils.getEnumValuesList(ControlMessageType.class); - ControlMessageType(int value) { + ControlMessageType(int value, Supplier constructor) { this.value = value; + this.constructor = constructor; + this.shallowClassOverhead = ClassSizeEstimator.getClassOverhead(constructor.get().getClass()); } @Override @@ -54,27 +76,7 @@ public int getValue() { * - {@link VersionSwap} */ public Object getNewInstance() { - switch (valueOf(value)) { - case START_OF_PUSH: - return new StartOfPush(); - case END_OF_PUSH: - return new EndOfPush(); - case START_OF_SEGMENT: - return new StartOfSegment(); - case END_OF_SEGMENT: - return new EndOfSegment(); - case START_OF_INCREMENTAL_PUSH: - return new StartOfIncrementalPush(); - case END_OF_INCREMENTAL_PUSH: - return new EndOfIncrementalPush(); - case TOPIC_SWITCH: - return new TopicSwitch(); - case VERSION_SWAP: - return new VersionSwap(); - - default: - throw new VeniceException("Unsupported " + getClass().getSimpleName() + " value: " + value); - } + return this.constructor.get(); } public static ControlMessageType valueOf(int value) { @@ -84,4 +86,8 @@ public static ControlMessageType valueOf(int value) { public static ControlMessageType valueOf(ControlMessage controlMessage) { return valueOf(controlMessage.controlMessageType); } + + public int getShallowClassOverhead() { + return this.shallowClassOverhead; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/memory/ClassSizeEstimator.java b/internal/venice-common/src/main/java/com/linkedin/venice/memory/ClassSizeEstimator.java new file mode 100644 index 0000000000..a03f3eea67 --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/memory/ClassSizeEstimator.java @@ -0,0 +1,250 @@ +package com.linkedin.venice.memory; + +import com.linkedin.venice.utils.Utils; +import com.sun.management.HotSpotDiagnosticMXBean; +import java.lang.management.ManagementFactory; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.List; +import javax.annotation.Nonnull; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +/** + * Utility class to help in implementing {@link Measurable#getHeapSize()}. A couple of important points: + * + * 1. This utility class does not "measure" the heap size, but rather attempts to "predict" it, based on knowledge of + * the internals of the JVM. If any of the assumptions are wrong, then of course the results will be inaccurate. + * 2. This utility class assumes we are using the HotSpot JVM. + */ +public class ClassSizeEstimator { + private static final Logger LOGGER = LogManager.getLogger(ClassSizeEstimator.class); + private static final ClassValue KNOWN_SHALLOW_SIZES = new ClassValue() { + @Override + protected Integer computeValue(Class type) { + return computeClassOverhead(type); + } + }; + private static final boolean IS_64_BITS; + private static final boolean COMPRESSED_OOPS; + private static final boolean COMPRESSED_CLASS_POINTERS; + private static final int ALIGNMENT_SIZE; + private static final int OBJECT_HEADER_SIZE; + static final int ARRAY_HEADER_SIZE; + private static final int POINTER_SIZE; + private static final int JAVA_MAJOR_VERSION; + + static { + /** + * This prop name looks sketchy, but I've seen it mentioned in a couple of posts online, so I'm trusting it for + * now... In any case, if that property is not found, then we'll default to 64 bits, which is a conservative + * assumption (i.e., it would cause us to over-estimate the size on heap)... + */ + String propertyName = "sun.arch.data.model"; + String arch = System.getProperty(propertyName); + LOGGER.debug("System property {} has value: {}", propertyName, arch); + IS_64_BITS = (arch == null) || !arch.contains("32"); + COMPRESSED_OOPS = getBooleanVmOption("UseCompressedOops"); + COMPRESSED_CLASS_POINTERS = getBooleanVmOption("UseCompressedClassPointers"); + ALIGNMENT_SIZE = IS_64_BITS ? 8 : 4; // Also serves as the object header's "mark word" size + OBJECT_HEADER_SIZE = ALIGNMENT_SIZE + (COMPRESSED_CLASS_POINTERS ? 4 : 8); + /** + * The "array base" is always word-aligned, which under some circumstances (in 32 bits JVMs, or in 64 bits JVMs + * where {@link COMPRESSED_OOPS} is false, either due to a large heap or to explicit disabling) causes "internal + * space loss". + * + * See: https://shipilev.net/jvm/objects-inside-out/#_observation_array_base_is_aligned + */ + ARRAY_HEADER_SIZE = roundUpToNearest(OBJECT_HEADER_SIZE + Integer.BYTES, ALIGNMENT_SIZE); + POINTER_SIZE = COMPRESSED_OOPS ? 4 : 8; + JAVA_MAJOR_VERSION = Utils.getJavaMajorVersion(); + } + + private ClassSizeEstimator() { + // Static utility + } + + /** + * This function provides the "shallow size" of each instance of the given class, meaning that all pointers are + * considered to be null. + * + * Intended usage: This function is a helper to make it easier to implement {@link Measurable#getHeapSize()}. It + * should not be called on the hot path. Rather, it should be called at most once per JVM runtime per class of + * interest and the result should be stored in a static field. If an instance of a measured class contains only + * primitive fields or all its non-primitive fields are null, then the result of this function is effectively the heap + * size of that instance. If these conditions are not true, then a function should be implemented which uses this + * class overhead as a base and then adds the size of any referenced Objects. For example, see + * {@link InstanceSizeEstimator#getObjectSize(Object)}. + * + * @param c The {@link Class} for which to predict the "shallow overhead", as defined above. + * @return The base overhead of the class, which can be any positive number (including zero). + */ + public static int getClassOverhead(@Nonnull final Class c) { + Integer knownSize = KNOWN_SHALLOW_SIZES.get(c); + if (knownSize != null) { + return knownSize; + } + return computeClassOverhead(c); + } + + private static int computeClassOverhead(@Nonnull final Class c) { + if (c == null) { + throw new NullPointerException("The class param must not be null."); + } + + if (c.isEnum()) { + /** + * Each enum value is stored statically, and so the per-instance overhead of enum fields is only the pointer size, + * already taken into account in the field loop below. + */ + return 0; + } + + if (c.isPrimitive()) { + return getPrimitiveSize(c); + } + + int size = c.isArray() ? ARRAY_HEADER_SIZE : OBJECT_HEADER_SIZE; + + /** + * We need to measure the overhead of fields for the passed in class as well as all parents. The order in which we + * traverse these classes matters in older Java versions (prior to 15) and not in newer ones, but for the sake of + * minimizing code size, we will always iterate in the order relevant to older Java versions. + */ + List classHierarchyFromSubclassToParent = new ArrayList<>(); + classHierarchyFromSubclassToParent.add(c); + Class parentClass = c.getSuperclass(); + while (parentClass != null) { + classHierarchyFromSubclassToParent.add(parentClass); + parentClass = parentClass.getSuperclass(); + } + + // Iterate from the end to the beginning, so we go from parent to sub + for (int i = classHierarchyFromSubclassToParent.size() - 1; i >= 0; i--) { + int classFieldsOverhead = overheadOfFields(classHierarchyFromSubclassToParent.get(i)); + if (classFieldsOverhead == 0) { + continue; + } + size += classFieldsOverhead; + + if (JAVA_MAJOR_VERSION < 15) { + /** + * In older Java versions, the field layout would always order the fields from the parent class to subclass. + * Within a class, the fields could be re-ordered to optimize packing the fields within the "alignment shadow", + * but not across classes of the hierarchy. + * + * See: https://shipilev.net/jvm/objects-inside-out/#_superclass_gaps + */ + if (i > 0) { + /** + * We align for each class, except the last one, since we'll take care of it below. BUT, importantly, at this + * stage we align by pointer size, NOT by alignment size. + */ + size = roundUpToNearest(size, POINTER_SIZE); + } + } + } + + // We align once at the end no matter the Java version + size = roundUpToNearestAlignment(size); + + return size; + } + + /** Based on: https://shipilev.net/jvm/objects-inside-out/#_data_types_and_their_representation */ + private static int getPrimitiveSize(Class c) { + if (c.isPrimitive()) { + if (c.equals(boolean.class)) { + return 1; + } else if (c.equals(byte.class)) { + return Byte.BYTES; + } else if (c.equals(char.class)) { + return Character.BYTES; + } else if (c.equals(short.class)) { + return Short.BYTES; + } else if (c.equals(int.class)) { + return Integer.BYTES; + } else if (c.equals(float.class)) { + return Float.BYTES; + } else if (c.equals(long.class)) { + return Long.BYTES; + } else if (c.equals(double.class)) { + return Double.BYTES; + } + + // Defensive code + throw new IllegalStateException( + "Class " + c.getSimpleName() + + " is said to be a primitive but does not conform to any known primitive type!"); + } else { + throw new IllegalArgumentException("Class " + c.getSimpleName() + " is not a primitive!"); + } + } + + static int roundUpToNearestAlignment(int size) { + return roundUpToNearest(size, ALIGNMENT_SIZE); + } + + /** Deal with alignment by rounding up to the nearest boundary. */ + private static int roundUpToNearest(int size, int intervalSize) { + int partialAlignmentWindowUsage = size % intervalSize; + int waste = partialAlignmentWindowUsage == 0 ? 0 : intervalSize - partialAlignmentWindowUsage; + int finalSize = size + waste; + return finalSize; + } + + private static int overheadOfFields(Class c) { + int size = 0; + + /** + * {@link Class#getDeclaredFields()} returns the fields (of all visibility, from private to public and everything + * in between) of the class, but not of its parent class. + */ + for (Field f: c.getDeclaredFields()) { + if (Modifier.isStatic(f.getModifiers())) { + continue; + } + Class fieldClass = f.getType(); + if (fieldClass.isPrimitive()) { + size += KNOWN_SHALLOW_SIZES.get(fieldClass); + } else { + /** + * Only primitives are stored in-line within the object, while all non-primitives are stored elsewhere on the + * heap, with a pointer within the object to reference them. + */ + size += POINTER_SIZE; + } + } + + /** + * N.B.: The output of this function could be cached, though we're currently not doing it since the intent is not + * to call this on the hat path anyway. + */ + + return size; + } + + /** Package-private on purpose, for tests... */ + static boolean is64bitsJVM() { + return IS_64_BITS; + } + + /** Package-private on purpose, for tests... */ + static boolean isUseCompressedOopsEnabled() { + return COMPRESSED_OOPS; + } + + /** Package-private on purpose, for tests... */ + static boolean isCompressedKlassPointersEnabled() { + return COMPRESSED_CLASS_POINTERS; + } + + private static boolean getBooleanVmOption(String optionName) { + String optionValue = + ManagementFactory.getPlatformMXBean(HotSpotDiagnosticMXBean.class).getVMOption(optionName).getValue(); + LOGGER.debug("VM option {} has value: {}", optionName, optionValue); + return Boolean.parseBoolean(optionValue); + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/memory/InstanceSizeEstimator.java b/internal/venice-common/src/main/java/com/linkedin/venice/memory/InstanceSizeEstimator.java new file mode 100644 index 0000000000..0c67049ae9 --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/memory/InstanceSizeEstimator.java @@ -0,0 +1,180 @@ +package com.linkedin.venice.memory; + +import static com.linkedin.venice.memory.ClassSizeEstimator.ARRAY_HEADER_SIZE; +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; + +import com.linkedin.venice.kafka.protocol.ControlMessage; +import com.linkedin.venice.kafka.protocol.Delete; +import com.linkedin.venice.kafka.protocol.GUID; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.kafka.protocol.LeaderMetadata; +import com.linkedin.venice.kafka.protocol.ProducerMetadata; +import com.linkedin.venice.kafka.protocol.Put; +import com.linkedin.venice.kafka.protocol.Update; +import com.linkedin.venice.kafka.protocol.enums.ControlMessageType; +import com.linkedin.venice.kafka.protocol.enums.MessageType; +import com.linkedin.venice.writer.VeniceWriter; +import java.nio.ByteBuffer; +import java.util.function.ToIntFunction; +import javax.annotation.Nonnull; + + +/** + * This utility class provides functions to measure the heap size of objects for a limited number of classes. Wherever + * possible, the logic makes use of knowledge of the Venice code so that shared or static instances are ignored (i.e., + * assuming that their amortized cost is negligible). The code here should all be hot path friendly, and make no use of + * reflection (except in static constants). + * + * If more classes require heap size measurement, the preferred approach is NOT to add code into this class! Rather, the + * preferred approach is to implement {@link Measurable} and write the logic in {@link Measurable#getHeapSize()}. This + * utility class is for cases where we wish to measure classes that we cannot modify, such as those coming from the JDK, + * from third-party libraries, or from code-gen. + */ +public class InstanceSizeEstimator { + private static final ClassValue> CACHE = new ClassValue>() { + @Override + protected ToIntFunction computeValue(Class type) { + if (Measurable.class.isAssignableFrom(type)) { + return value -> ((Measurable) value).getHeapSize(); + } else if (Put.class.isAssignableFrom(type)) { + return value -> getSize((Put) value); + } else if (Delete.class.isAssignableFrom(type)) { + return value -> getSize((Delete) value); + } else if (Update.class.isAssignableFrom(type)) { + return value -> getSize((Update) value); + } else if (ControlMessage.class.isAssignableFrom(type)) { + return value -> getSize((ControlMessage) value); + } else if (KafkaMessageEnvelope.class.isAssignableFrom(type)) { + return value -> getSize((KafkaMessageEnvelope) value); + } else if (ProducerMetadata.class.isAssignableFrom(type)) { + return value -> getSize((ProducerMetadata) value); + } else if (ByteBuffer.class.isAssignableFrom(type)) { + return value -> getSize((ByteBuffer) value); + } else if (type.isArray() && type.getComponentType().equals(byte.class)) { + return value -> getSize((byte[]) value); + } + return null; + } + }; + private static final int GUID_FULL_CLASS_OVERHEAD = + getClassOverhead(GUID.class) + getByteArraySizeByLength(GUID.getClassSchema().getFixedSize()); + private static final int PRODUCER_METADATA_FULL_CLASS_OVERHEAD = + getClassOverhead(ProducerMetadata.class) + GUID_FULL_CLASS_OVERHEAD; + private static final int KME_PARTIAL_CLASS_OVERHEAD = + getClassOverhead(KafkaMessageEnvelope.class) + PRODUCER_METADATA_FULL_CLASS_OVERHEAD; + private static final int PUT_SHALLOW_CLASS_OVERHEAD = getClassOverhead(Put.class); + private static final int UPDATE_SHALLOW_CLASS_OVERHEAD = getClassOverhead(Update.class); + private static final int DELETE_SHALLOW_CLASS_OVERHEAD = getClassOverhead(Delete.class); + private static final int CONTROL_MESSAGE_SHALLOW_CLASS_OVERHEAD = getClassOverhead(ControlMessage.class); + private static final int LEADER_METADATA_SHALLOW_CLASS_OVERHEAD = getClassOverhead(LeaderMetadata.class); + private static final int BYTE_BUFFER_SHALLOW_CLASS_OVERHEAD = getClassOverhead(ByteBuffer.class); + + private InstanceSizeEstimator() { + // Static utility + } + + /** + * Works for {@link Measurable} objects and a small number of other types. + * + * Not intended as a generic utility for any instance type! + * + * @throws IllegalArgumentException when an unsupported type is passed. + */ + public static int getObjectSize(@Nonnull Object o) { + if (o instanceof Measurable) { + return ((Measurable) o).getHeapSize(); + } + ToIntFunction sizeComputation = CACHE.get(o.getClass()); + if (sizeComputation == null) { + throw new IllegalArgumentException("Object of type " + o.getClass() + " is not measurable!"); + } + return sizeComputation.applyAsInt(o); + } + + public static int getByteArraySizeByLength(int length) { + return ClassSizeEstimator.roundUpToNearestAlignment(ARRAY_HEADER_SIZE + length); + } + + public static int getSize(@Nonnull byte[] bytes) { + return getByteArraySizeByLength(bytes.length); + } + + public static int getSize(ByteBuffer byteBuffer) { + if (byteBuffer == null) { + return 0; + } + if (byteBuffer.hasArray()) { + return BYTE_BUFFER_SHALLOW_CLASS_OVERHEAD + getByteArraySizeByLength(byteBuffer.capacity()); + } + + throw new IllegalArgumentException("Only array-backed ByteBuffers are measurable with this function."); + } + + public static int getSize(@Nonnull ProducerMetadata producerMetadata) { + return PRODUCER_METADATA_FULL_CLASS_OVERHEAD; + } + + public static int getSize(@Nonnull Put put) { + int size = PUT_SHALLOW_CLASS_OVERHEAD; + size += getSize(put.putValue); + if (put.replicationMetadataPayload != null) { + size += BYTE_BUFFER_SHALLOW_CLASS_OVERHEAD; + if (put.replicationMetadataPayload.array() != put.putValue.array()) { + /** + * N.B.: When using the {@link org.apache.avro.io.OptimizedBinaryDecoder}, the {@link put.putValue} and the + * {@link put.replicationMetadataPayload} will be backed by the same underlying array. If that is the + * case, then we don't want to account for the capacity twice. + */ + size += put.replicationMetadataPayload.capacity(); + } + } + return size; + } + + public static int getSize(@Nonnull Delete delete) { + return DELETE_SHALLOW_CLASS_OVERHEAD + getSize(delete.replicationMetadataPayload); + } + + /** + * This function is imprecise in a couple of ways. The {@link ControlMessage#controlMessageUnion} field is treated as + * shallow, which in some cases is false (e.g. if a compression dictionary were present), and the + * {@link ControlMessage#debugInfo} is ignored (i.e. treated as null). + * + * We can be more precise by looking at the {@link ControlMessage#controlMessageType} and then providing the precise + * overhead based on each type, but we're skipping this work for now since, in our use case, control messages should + * be a negligible fraction of all messages, and therefore not that important to get exactly right. + */ + public static int getSize(@Nonnull ControlMessage cm) { + return CONTROL_MESSAGE_SHALLOW_CLASS_OVERHEAD + ControlMessageType.valueOf(cm).getShallowClassOverhead(); + } + + public static int getSize(@Nonnull Update update) { + return UPDATE_SHALLOW_CLASS_OVERHEAD + getSize(update.updateValue); + } + + /** + * Measure the heap usage of {@link KafkaMessageEnvelope}. + */ + public static int getSize(KafkaMessageEnvelope kme) { + int size = KME_PARTIAL_CLASS_OVERHEAD; + switch (MessageType.valueOf(kme)) { + case PUT: + size += getSize((Put) kme.payloadUnion); + break; + case DELETE: + size += getSize((Delete) kme.payloadUnion); + break; + case CONTROL_MESSAGE: + size += getSize((ControlMessage) kme.payloadUnion); + break; + case UPDATE: + size += getSize((Update) kme.payloadUnion); + break; + } + if (kme.leaderMetadataFooter != null && !(kme.leaderMetadataFooter instanceof VeniceWriter.DefaultLeaderMetadata)) { + /** The host name part of this object should always be shared, hence we ignore it. */ + size += LEADER_METADATA_SHALLOW_CLASS_OVERHEAD; + } + return size; + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/memory/Measurable.java b/internal/venice-common/src/main/java/com/linkedin/venice/memory/Measurable.java new file mode 100644 index 0000000000..f61713173c --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/memory/Measurable.java @@ -0,0 +1,5 @@ +package com.linkedin.venice.memory; + +public interface Measurable { + int getHeapSize(); +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java b/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java index 163227f4d5..ae5971c7f0 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/message/KafkaKey.java @@ -3,6 +3,9 @@ import com.linkedin.venice.guid.HeartbeatGuidV3Generator; import com.linkedin.venice.kafka.protocol.GUID; import com.linkedin.venice.kafka.protocol.enums.MessageType; +import com.linkedin.venice.memory.ClassSizeEstimator; +import com.linkedin.venice.memory.InstanceSizeEstimator; +import com.linkedin.venice.memory.Measurable; import com.linkedin.venice.utils.ByteUtils; import java.nio.ByteBuffer; import javax.annotation.Nonnull; @@ -13,7 +16,8 @@ * Class which stores the components of a Kafka Key, and is the format specified in the * {@link com.linkedin.venice.serialization.KafkaKeySerializer}. */ -public class KafkaKey { +public class KafkaKey implements Measurable { + private static final int SHALLOW_CLASS_OVERHEAD = ClassSizeEstimator.getClassOverhead(KafkaKey.class); /** * For control messages, the Key part of the {@link KafkaKey} includes the producer GUID, segment and sequence number. * @@ -32,11 +36,11 @@ public class KafkaKey { private final byte keyHeaderByte; private final byte[] key; // TODO: Consider whether we may want to use a ByteBuffer here - public KafkaKey(@Nonnull MessageType messageType, byte[] key) { + public KafkaKey(@Nonnull MessageType messageType, @Nonnull byte[] key) { this(messageType.getKeyHeaderByte(), key); } - public KafkaKey(byte keyHeaderByte, byte[] key) { + public KafkaKey(byte keyHeaderByte, @Nonnull byte[] key) { this.keyHeaderByte = keyHeaderByte; this.key = key; } @@ -78,9 +82,7 @@ public String toString() { + ByteUtils.toHexString(key) + ")"; } - public int getEstimatedObjectSizeOnHeap() { - // This constant is the estimated size of the enclosing object + the byte[]'s overhead. - // TODO: Find a library that would allow us to precisely measure this and store it in a static constant. - return getKeyLength() + 36; + public int getHeapSize() { + return SHALLOW_CLASS_OVERHEAD + InstanceSizeEstimator.getSize(this.key); } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/ImmutablePubSubMessage.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/ImmutablePubSubMessage.java index b15ec83e4f..caa73c591e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/ImmutablePubSubMessage.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/ImmutablePubSubMessage.java @@ -1,5 +1,7 @@ package com.linkedin.venice.pubsub; +import com.linkedin.venice.memory.ClassSizeEstimator; +import com.linkedin.venice.memory.InstanceSizeEstimator; import com.linkedin.venice.pubsub.api.PubSubMessage; import com.linkedin.venice.pubsub.api.PubSubMessageHeaders; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; @@ -7,6 +9,7 @@ public class ImmutablePubSubMessage implements PubSubMessage { + private static final int SHALLOW_CLASS_OVERHEAD = ClassSizeEstimator.getClassOverhead(ImmutablePubSubMessage.class); private final K key; private final V value; private final PubSubTopicPartition topicPartition; @@ -87,4 +90,11 @@ public PubSubMessageHeaders getPubSubMessageHeaders() { public String toString() { return "PubSubMessage{" + topicPartition + ", offset=" + offset + ", timestamp=" + timestamp + '}'; } + + @Override + public int getHeapSize() { + /** The {@link #topicPartition} is supposed to be a shared instance, and is therefore ignored. */ + return SHALLOW_CLASS_OVERHEAD + InstanceSizeEstimator.getObjectSize(key) + + InstanceSizeEstimator.getObjectSize(value); + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaUtils.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaUtils.java index 9b0ea6b3a6..f024e4e404 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaUtils.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaUtils.java @@ -1,5 +1,6 @@ package com.linkedin.venice.pubsub.adapter.kafka; +import com.linkedin.venice.pubsub.api.PubSubMessageHeader; import com.linkedin.venice.pubsub.api.PubSubMessageHeaders; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -7,12 +8,18 @@ public class ApacheKafkaUtils { public static final RecordHeaders EMPTY_RECORD_HEADERS = new RecordHeaders(); + static { + EMPTY_RECORD_HEADERS.setReadOnly(); + } + public static RecordHeaders convertToKafkaSpecificHeaders(PubSubMessageHeaders headers) { - if (headers == null) { + if (headers == null || headers.isEmpty()) { return EMPTY_RECORD_HEADERS; } RecordHeaders recordHeaders = new RecordHeaders(); - headers.toList().forEach(header -> recordHeaders.add(header.key(), header.value())); + for (PubSubMessageHeader header: headers) { + recordHeaders.add(header.key(), header.value()); + } return recordHeaders; } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/EmptyPubSubMessageHeaders.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/EmptyPubSubMessageHeaders.java index cea4522bcf..141f025e5d 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/EmptyPubSubMessageHeaders.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/EmptyPubSubMessageHeaders.java @@ -31,4 +31,14 @@ public PubSubMessageHeaders remove(String key) { public List toList() { return Collections.emptyList(); } + + public boolean isEmpty() { + return true; + } + + @Override + public int getHeapSize() { + // This is the point of using a singleton! + return 0; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessage.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessage.java index c910efb11e..41c3167ed4 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessage.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessage.java @@ -1,6 +1,9 @@ package com.linkedin.venice.pubsub.api; -public interface PubSubMessage { +import com.linkedin.venice.memory.Measurable; + + +public interface PubSubMessage extends Measurable { /** * @return the key part of this message */ diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java index 88c539da53..36970faa78 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java @@ -57,7 +57,7 @@ public PubSubMessage deserialize( KafkaKey key = keySerializer.deserialize(null, keyBytes); KafkaMessageEnvelope value = null; if (key.isControlMessage()) { - for (PubSubMessageHeader header: headers.toList()) { + for (PubSubMessageHeader header: headers) { // only process VENICE_TRANSPORT_PROTOCOL_HEADER here. Other headers will be stored in // ImmutablePubSubMessage and used down the ingestion path later if (header.key().equals(VENICE_TRANSPORT_PROTOCOL_HEADER)) { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeader.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeader.java index d4a604d617..49e7e40b43 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeader.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeader.java @@ -1,6 +1,6 @@ package com.linkedin.venice.pubsub.api; -import com.linkedin.venice.common.Measurable; +import com.linkedin.venice.memory.Measurable; import java.util.Arrays; import java.util.Objects; @@ -47,7 +47,7 @@ public boolean equals(Object otherObj) { * TODO: the following estimation doesn't consider the overhead of the internal structure. */ @Override - public int getSize() { + public int getHeapSize() { return key.length() + value.length; } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java index 828f766948..aa9ce1e042 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageHeaders.java @@ -1,22 +1,26 @@ package com.linkedin.venice.pubsub.api; -import com.linkedin.venice.common.Measurable; +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; + +import com.linkedin.venice.memory.Measurable; +import com.linkedin.venice.utils.collections.MeasurableLinkedHashMap; import java.util.ArrayList; -import java.util.LinkedHashMap; +import java.util.Collections; +import java.util.Iterator; import java.util.List; -import java.util.Map; /** * Set of key-value pairs to tagged with messages produced to a topic. * In case of headers with the same key, only the most recently added headers value will be kept. */ -public class PubSubMessageHeaders implements Measurable { +public class PubSubMessageHeaders implements Measurable, Iterable { + private static final int SHALLOW_CLASS_OVERHEAD = getClassOverhead(PubSubMessageHeaders.class); /** * N.B.: Kafka allows duplicate keys in the headers but some pubsub systems may not * allow it. Hence, we will enforce uniqueness of keys in headers from the beginning. */ - private final Map headers = new LinkedHashMap<>(); + private final MeasurableLinkedHashMap headers = new MeasurableLinkedHashMap<>(); public static final String VENICE_TRANSPORT_PROTOCOL_HEADER = "vtp"; /** Header to denote whether the leader is completed or not */ @@ -43,18 +47,20 @@ public PubSubMessageHeaders remove(String key) { * If no headers are present an empty list is returned. */ public List toList() { - return new ArrayList<>(headers.values()); + return headers.isEmpty() ? Collections.emptyList() : new ArrayList<>(headers.values()); + } + + public boolean isEmpty() { + return headers.isEmpty(); + } + + @Override + public int getHeapSize() { + return SHALLOW_CLASS_OVERHEAD + this.headers.getHeapSize(); } - /** - * TODO: the following estimation doesn't consider the overhead of the internal structure. - */ @Override - public int getSize() { - int size = 0; - for (Map.Entry entry: headers.entrySet()) { - size += entry.getKey().length() + entry.getValue().getSize(); - } - return size; + public Iterator iterator() { + return headers.values().iterator(); } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubProducerAdapterConcurrentDelegator.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubProducerAdapterConcurrentDelegator.java index aa59261448..c0b019d4ed 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubProducerAdapterConcurrentDelegator.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubProducerAdapterConcurrentDelegator.java @@ -1,11 +1,13 @@ package com.linkedin.venice.pubsub.api; -import com.linkedin.venice.common.Measurable; +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; + import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.memory.InstanceSizeEstimator; +import com.linkedin.venice.memory.Measurable; import com.linkedin.venice.message.KafkaKey; import com.linkedin.venice.utils.DaemonThreadFactory; -import com.linkedin.venice.utils.ProtocolUtils; import com.linkedin.venice.utils.collections.MemoryBoundBlockingQueue; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import it.unimi.dsi.fastutil.objects.Object2DoubleMap; @@ -120,8 +122,9 @@ public PubSubProducerAdapterConcurrentDelegator( } public static class ProducerQueueNode implements Measurable { - // Rough estimation. - private static final int PRODUCER_QUEUE_NODE_OVERHEAD = 256; + /** We assume that the {@link #produceFuture} is empty, and the {@link #topic} is a shared instance. */ + private static final int PRODUCER_QUEUE_NODE_PARTIAL_OVERHEAD = + getClassOverhead(ProducerQueueNode.class) + getClassOverhead(CompletableFuture.class); private final String topic; private final int partition; @@ -149,10 +152,13 @@ public ProducerQueueNode( } @Override - public int getSize() { - return topic.length() + 4 + key.getEstimatedObjectSizeOnHeap() - + ProtocolUtils.getEstimateOfMessageEnvelopeSizeOnHeap(value) + headers.getSize() - + PRODUCER_QUEUE_NODE_OVERHEAD; + public int getHeapSize() { + int size = PRODUCER_QUEUE_NODE_PARTIAL_OVERHEAD + key.getHeapSize() + InstanceSizeEstimator.getSize(value) + + headers.getHeapSize(); + if (this.callback instanceof Measurable) { + size += ((Measurable) this.callback).getHeapSize(); + } + return size; } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/ProtocolUtils.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/ProtocolUtils.java deleted file mode 100644 index 3b8ccf9271..0000000000 --- a/internal/venice-common/src/main/java/com/linkedin/venice/utils/ProtocolUtils.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.linkedin.venice.utils; - -import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; -import com.linkedin.venice.kafka.protocol.Put; -import com.linkedin.venice.kafka.protocol.Update; -import com.linkedin.venice.kafka.protocol.enums.MessageType; - - -public class ProtocolUtils { - /** - * Measure the heap usage of {@link KafkaMessageEnvelope}. - */ - public static int getEstimateOfMessageEnvelopeSizeOnHeap(KafkaMessageEnvelope messageEnvelope) { - int kmeBaseOverhead = 100; // Super rough estimate. TODO: Measure with a more precise library and store statically - switch (MessageType.valueOf(messageEnvelope)) { - case PUT: - Put put = (Put) messageEnvelope.payloadUnion; - int size = put.putValue.capacity(); - if (put.replicationMetadataPayload != null - /** - * N.B.: When using the {@link org.apache.avro.io.OptimizedBinaryDecoder}, the {@link put.putValue} and the - * {@link put.replicationMetadataPayload} will be backed by the same underlying array. If that is the - * case, then we don't want to account for the capacity twice. - */ - && put.replicationMetadataPayload.array() != put.putValue.array()) { - size += put.replicationMetadataPayload.capacity(); - } - return size + kmeBaseOverhead; - case UPDATE: - Update update = (Update) messageEnvelope.payloadUnion; - return update.updateValue.capacity() + kmeBaseOverhead; - default: - return kmeBaseOverhead; - } - } -} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/collections/MeasurableLinkedHashMap.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/collections/MeasurableLinkedHashMap.java new file mode 100644 index 0000000000..cffeb90fc8 --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/utils/collections/MeasurableLinkedHashMap.java @@ -0,0 +1,107 @@ +package com.linkedin.venice.utils.collections; + +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; +import static com.linkedin.venice.memory.InstanceSizeEstimator.getByteArraySizeByLength; + +import com.linkedin.venice.memory.Measurable; +import java.util.LinkedHashMap; + + +/** + * A subclass of {@link LinkedHashMap} which does a best-effort attempt at guessing its size on heap + the size of its + * values. Several assumptions are made which could make this imprecise in some contexts. See the usage in + * {@link com.linkedin.venice.pubsub.api.PubSubMessageHeaders} for the usage it was originally intended for... + */ +public class MeasurableLinkedHashMap extends LinkedHashMap implements Measurable { + private static final int SHALLOW_CLASS_OVERHEAD = getClassOverhead(MeasurableLinkedHashMap.class); + private static final int ENTRY_CLASS_OVERHEAD = getClassOverhead(PseudoLinkedHashMapEntry.class); + /** + * The reason to have {@link PseudoLinkedValues} is that if we call {@link #getHeapSize()} then we will iterate over + * the values and this causes a cached view of the map to be held, thus creating one more object... + */ + private static final int LINKED_VALUES_SHALLOW_CLASS_OVERHEAD = + getClassOverhead(new MeasurableLinkedHashMap().getPseudoLinkedValues().getClass()); + + /** + * The default initial capacity - MUST be a power of two. + */ + private static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16 + + /** + * The maximum capacity, used if a higher value is implicitly specified + * by either of the constructors with arguments. + * MUST be a power of two <= 1<<30. + */ + private static final int MAXIMUM_CAPACITY = 1 << 30; + + /** + * The load factor used when none specified in constructor. + */ + private static final float DEFAULT_LOAD_FACTOR = 0.75f; + + private int guessCurrentlyAllocatedTableSize() { + if (isEmpty()) { + /** + * This is not necessarily true... if a map had entries in it, and then they got all removed, then the table would + * be empty but its capacity would still be whatever it was when it had the most entries, since maps grow but + * don't shrink. Tracking this accurately would require making this subclass stateful, which would add complexity + * and may not be a good trade. So we take a naive approach instead. + */ + return 0; + } + + int assumedCapacity = DEFAULT_INITIAL_CAPACITY; + int assumedThreshold = (int) (DEFAULT_LOAD_FACTOR * (float) assumedCapacity); + while (assumedCapacity <= MAXIMUM_CAPACITY) { + if (size() < assumedThreshold) { + return assumedCapacity; + } + + // double assumed capacity and threshold + assumedCapacity = assumedCapacity << 1; + assumedThreshold = assumedThreshold << 1; + } + return MAXIMUM_CAPACITY; + } + + @Override + public int getHeapSize() { + int size = SHALLOW_CLASS_OVERHEAD; + int tableSize = getByteArraySizeByLength(guessCurrentlyAllocatedTableSize()); + if (tableSize > 0) { + size += tableSize; + size += size() * ENTRY_CLASS_OVERHEAD; + for (V value: values()) { + /** + * N.B.: We only consider the value size, and ignore the key size. In our initial use case for this class, this + * is okay, though technically not correct. + */ + size += value.getHeapSize(); + } + /** + * Unfortunately, the act of calling {@link #values()} ends up generating a new object, and storing it in a class + * field, so we must now account for it... + */ + size += LINKED_VALUES_SHALLOW_CLASS_OVERHEAD; + } + + return size; + } + + // The classes below are for the sake of measuring overhead, since the equivalent JDK classes have limited visibility + static class PseudoHashMapNode { + int hash; + Object key, value, next; + } + + static class PseudoLinkedHashMapEntry extends PseudoHashMapNode { + Object before, after; + } + + private PseudoLinkedValues getPseudoLinkedValues() { + return new PseudoLinkedValues(); + } + + final class PseudoLinkedValues { + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/utils/collections/MemoryBoundBlockingQueue.java b/internal/venice-common/src/main/java/com/linkedin/venice/utils/collections/MemoryBoundBlockingQueue.java index 6e51a4d17d..8206d00016 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/utils/collections/MemoryBoundBlockingQueue.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/utils/collections/MemoryBoundBlockingQueue.java @@ -1,7 +1,9 @@ package com.linkedin.venice.utils.collections; -import com.linkedin.venice.common.Measurable; +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; + import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.memory.Measurable; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; @@ -46,11 +48,9 @@ public class MemoryBoundBlockingQueue implements BlockingQueue { private static final Logger LOGGER = LogManager.getLogger(MemoryBoundBlockingQueue.class); /** - * Considering the node implementation: {@link java.util.LinkedList.Node}, the overhead - * is three references, which could be about 24 bytes, and the 'Node' object type itself could take 24 bytes. - * We can adjust this value later if necessary. + * Considering the node implementation: {@link java.util.LinkedList.Node}, the overhead is three references. */ - public static final int LINKED_QUEUE_NODE_OVERHEAD_IN_BYTE = 48; + public static final int LINKED_LIST_NODE_SHALLOW_OVERHEAD = getClassOverhead(LinkedListQueueNodeSimulation.class); private final Queue queue; private final long memoryCapacityInByte; @@ -87,7 +87,7 @@ public long remainingMemoryCapacityInByte() { } private long getRecordSize(T record) { - return record.getSize() + LINKED_QUEUE_NODE_OVERHEAD_IN_BYTE; + return record.getHeapSize() + LINKED_LIST_NODE_SHALLOW_OVERHEAD; } @Override @@ -252,4 +252,11 @@ public int drainTo(Collection c) { public int drainTo(Collection c, int maxElements) { throw new VeniceException("Operation is not supported yet!"); } + + /** + * Only used to measure the size on heap of a class with three references. + */ + private static final class LinkedListQueueNodeSimulation { + private Object field1, field2, field3; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompletableFutureCallback.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompletableFutureCallback.java index 501ef523e9..6da8e1459f 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompletableFutureCallback.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/CompletableFutureCallback.java @@ -1,5 +1,8 @@ package com.linkedin.venice.writer; +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; + +import com.linkedin.venice.memory.Measurable; import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pubsub.api.PubSubProducerCallback; import java.util.concurrent.CompletableFuture; @@ -11,7 +14,10 @@ * changed and the callback will be called. The caller can pass a {@code CompletableFutureCallback} to a function * accepting a {@code Callback} parameter to get a {@code CompletableFuture} after the function returns. */ -public class CompletableFutureCallback implements PubSubProducerCallback { +public class CompletableFutureCallback implements PubSubProducerCallback, Measurable { + private static final int SHALLOW_CLASS_OVERHEAD = getClassOverhead(CompletableFutureCallback.class); + private static final int COMPLETABLE_FUTURE_SHALLOW_CLASS_OVERHEAD = getClassOverhead(CompletableFuture.class); + private final CompletableFuture completableFuture; private PubSubProducerCallback callback = null; @@ -36,4 +42,16 @@ public PubSubProducerCallback getCallback() { public void setCallback(PubSubProducerCallback callback) { this.callback = callback; } + + @Override + public int getHeapSize() { + int size = SHALLOW_CLASS_OVERHEAD; + if (this.completableFuture != null) { + size += COMPLETABLE_FUTURE_SHALLOW_CLASS_OVERHEAD; + } + if (this.callback instanceof Measurable) { + size += ((Measurable) this.callback).getHeapSize(); + } + return size; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/SendMessageErrorLoggerCallback.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/SendMessageErrorLoggerCallback.java index 23adbe403e..ac6ab2bc1e 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/SendMessageErrorLoggerCallback.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/SendMessageErrorLoggerCallback.java @@ -1,12 +1,16 @@ package com.linkedin.venice.writer; +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; + import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.memory.Measurable; import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pubsub.api.PubSubProducerCallback; import org.apache.logging.log4j.Logger; -class SendMessageErrorLoggerCallback implements PubSubProducerCallback { +class SendMessageErrorLoggerCallback implements PubSubProducerCallback, Measurable { + private static final int SHALLOW_CLASS_OVERHEAD = getClassOverhead(SendMessageErrorLoggerCallback.class); private final KafkaMessageEnvelope value; private final Logger logger; @@ -25,4 +29,13 @@ public void onCompletion(PubSubProduceResult produceResult, Exception e) { e); } } + + /** + * N.B.: For this use case, the shallow overhead is expected to be the relevant size, since both of the fields should + * point to shared instances. + */ + @Override + public int getHeapSize() { + return SHALLOW_CLASS_OVERHEAD; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java index b88cdd8eee..5d8cd79cb2 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java @@ -261,7 +261,18 @@ public class VeniceWriter extends AbstractVeniceWriter { private final Map defaultDebugInfo; private final boolean elapsedTimeForClosingSegmentEnabled; private final Object[] partitionLocks; - private String writerId; + private final String writerId; + + public static class DefaultLeaderMetadata extends LeaderMetadata { + public DefaultLeaderMetadata(CharSequence hostName) { + this.hostName = hostName; + this.upstreamOffset = DEFAULT_LEADER_METADATA_WRAPPER.getUpstreamOffset(); + this.upstreamKafkaClusterId = DEFAULT_LEADER_METADATA_WRAPPER.getUpstreamKafkaClusterId(); + } + } + + /** Used to reduce memory allocation in cases where the metadata is always going to be the same. */ + private final DefaultLeaderMetadata defaultLeaderMetadata; private volatile boolean isClosed = false; private final Object closeLock = new Object(); @@ -342,13 +353,12 @@ public VeniceWriter( // if INSTANCE_ID is not set, we'd use "hostname:port" as the default writer id if (props.containsKey(INSTANCE_ID)) { this.writerId = props.getString(INSTANCE_ID); + } else if (props.containsKey(LISTENER_PORT)) { + this.writerId = Utils.getHostName() + ":" + props.getInt(LISTENER_PORT); } else { this.writerId = Utils.getHostName(); - - if (props.containsKey(LISTENER_PORT)) { - this.writerId += ":" + props.getInt(LISTENER_PORT); - } } + this.defaultLeaderMetadata = new DefaultLeaderMetadata(this.writerId); this.producerGUID = GuidUtils.getGUID(props); this.logger = LogManager.getLogger("VeniceWriter [" + GuidUtils.getHexFromGuid(producerGUID) + "]"); // Create a thread pool which can have max 2 threads. @@ -1435,7 +1445,7 @@ private PubSubMessageHeaders getHeaders( if (addLeaderCompleteState) { // copy protocolSchemaHeaders locally and add extra header for leaderCompleteState returnPubSubMessageHeaders = new PubSubMessageHeaders(); - for (PubSubMessageHeader header: pubSubMessageHeaders.toList()) { + for (PubSubMessageHeader header: pubSubMessageHeaders) { returnPubSubMessageHeaders.add(header); } returnPubSubMessageHeaders.add(getLeaderCompleteStateHeader(leaderCompleteState)); @@ -1948,10 +1958,14 @@ protected KafkaMessageEnvelope getKafkaMessageEnvelope( producerMetadata.messageTimestamp = time.getMilliseconds(); producerMetadata.logicalTimestamp = logicalTs; kafkaValue.producerMetadata = producerMetadata; - kafkaValue.leaderMetadataFooter = new LeaderMetadata(); - kafkaValue.leaderMetadataFooter.hostName = writerId; - kafkaValue.leaderMetadataFooter.upstreamOffset = leaderMetadataWrapper.getUpstreamOffset(); - kafkaValue.leaderMetadataFooter.upstreamKafkaClusterId = leaderMetadataWrapper.getUpstreamKafkaClusterId(); + if (leaderMetadataWrapper == DEFAULT_LEADER_METADATA_WRAPPER) { + kafkaValue.leaderMetadataFooter = this.defaultLeaderMetadata; + } else { + kafkaValue.leaderMetadataFooter = new LeaderMetadata(); + kafkaValue.leaderMetadataFooter.hostName = writerId; + kafkaValue.leaderMetadataFooter.upstreamOffset = leaderMetadataWrapper.getUpstreamOffset(); + kafkaValue.leaderMetadataFooter.upstreamKafkaClusterId = leaderMetadataWrapper.getUpstreamKafkaClusterId(); + } return kafkaValue; } diff --git a/internal/venice-common/src/test/java/com/linkedin/davinci/kafka/consumer/SBSQueueNodeFactory.java b/internal/venice-common/src/test/java/com/linkedin/davinci/kafka/consumer/SBSQueueNodeFactory.java new file mode 100644 index 0000000000..66df20ab76 --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/davinci/kafka/consumer/SBSQueueNodeFactory.java @@ -0,0 +1,46 @@ +package com.linkedin.davinci.kafka.consumer; + +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.pubsub.api.PubSubMessage; + + +/** + * Simple utility class to get access to the package-private classes in {@link StoreBufferService} and their + * constructors. + */ +public class SBSQueueNodeFactory { + public static StoreBufferService.QueueNode queueNode( + PubSubMessage consumerRecord, + StoreIngestionTask ingestionTask, + String kafkaUrl, + long beforeProcessingRecordTimestampNs) { + return new StoreBufferService.QueueNode(consumerRecord, ingestionTask, kafkaUrl, beforeProcessingRecordTimestampNs); + } + + public static StoreBufferService.LeaderQueueNode leaderQueueNode( + PubSubMessage consumerRecord, + StoreIngestionTask ingestionTask, + String kafkaUrl, + long beforeProcessingRecordTimestampNs, + LeaderProducedRecordContext leaderProducedRecordContext) { + return new StoreBufferService.LeaderQueueNode( + consumerRecord, + ingestionTask, + kafkaUrl, + beforeProcessingRecordTimestampNs, + leaderProducedRecordContext); + } + + public static Class queueNodeClass() { + return StoreBufferService.QueueNode.class; + } + + public static Class leaderQueueNodeClass() { + return StoreBufferService.LeaderQueueNode.class; + } + + private SBSQueueNodeFactory() { + // Utility class + } +} diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/memory/ClassSizeEstimatorTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/memory/ClassSizeEstimatorTest.java new file mode 100644 index 0000000000..7493887198 --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/memory/ClassSizeEstimatorTest.java @@ -0,0 +1,396 @@ +package com.linkedin.venice.memory; + +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; +import static org.testng.Assert.assertThrows; + +import org.testng.annotations.Test; + + +public class ClassSizeEstimatorTest extends HeapSizeEstimatorTest { + public ClassSizeEstimatorTest() { + super(SubSubClassWithThreePrimitiveBooleanFields.class); + } + + @Test + public void testParamValidation() { + assertThrows(NullPointerException.class, () -> getClassOverhead(null)); + } + + @Test(dataProvider = "testMethodologies") + public void testClassOverhead(TestMethodology tm) { + printHeader(tm.resultsTableHeader); + + TestFunction tf = tm.tfProvider.apply(this); + + // Most basic case... just a plain Object. + tf.test(Object.class, 0); + + // Ensure that inheritance (in and of itself) adds no overhead. + tf.test(SubclassOfObjectWithNoFields.class, 0); + + // Ensure that one public primitive fields within a single class is accounted. + tf.test(ClassWithOnePublicPrimitiveBooleanField.class, BOOLEAN_SIZE); + tf.test(ClassWithOnePublicPrimitiveByteField.class, Byte.BYTES); + tf.test(ClassWithOnePublicPrimitiveCharField.class, Character.BYTES); + tf.test(ClassWithOnePublicPrimitiveShortField.class, Short.BYTES); + tf.test(ClassWithOnePublicPrimitiveIntField.class, Integer.BYTES); + tf.test(ClassWithOnePublicPrimitiveFloatField.class, Float.BYTES); + tf.test(ClassWithOnePublicPrimitiveLongField.class, Long.BYTES); + tf.test(ClassWithOnePublicPrimitiveDoubleField.class, Double.BYTES); + + // Ensure that two private primitive fields within a single class are accounted. + tf.test(ClassWithTwoPrimitiveBooleanFields.class, BOOLEAN_SIZE * 2); + tf.test(ClassWithTwoPrimitiveByteFields.class, Byte.BYTES * 2); + tf.test(ClassWithTwoPrimitiveCharFields.class, Character.BYTES * 2); + tf.test(ClassWithTwoPrimitiveShortFields.class, Short.BYTES * 2); + tf.test(ClassWithTwoPrimitiveIntFields.class, Integer.BYTES * 2); + tf.test(ClassWithTwoPrimitiveFloatFields.class, Float.BYTES * 2); + tf.test(ClassWithTwoPrimitiveLongFields.class, Long.BYTES * 2); + tf.test(ClassWithTwoPrimitiveDoubleFields.class, Double.BYTES * 2); + + // Ensure that a mix of public and private fields across the class hierarchy are accounted. + if (JAVA_MAJOR_VERSION < 15) { + // TODO: Plug in correct expected field size for these JVMs... + tf.test(SubClassWithTwoPrimitiveBooleanFields.class); + tf.test(SubClassWithTwoPrimitiveByteFields.class); + tf.test(SubClassWithTwoPrimitiveCharFields.class); + tf.test(SubClassWithTwoPrimitiveShortFields.class); + + tf.test(SubSubClassWithThreePrimitiveBooleanFields.class); + tf.test(SubSubClassWithThreePrimitiveByteFields.class); + tf.test(SubSubClassWithThreePrimitiveCharFields.class); + tf.test(SubSubClassWithThreePrimitiveShortFields.class); + } else { + tf.test(SubClassWithTwoPrimitiveBooleanFields.class, BOOLEAN_SIZE * 2); + tf.test(SubClassWithTwoPrimitiveByteFields.class, Byte.BYTES * 2); + tf.test(SubClassWithTwoPrimitiveCharFields.class, Character.BYTES * 2); + tf.test(SubClassWithTwoPrimitiveShortFields.class, Short.BYTES * 2); + + tf.test(SubSubClassWithThreePrimitiveBooleanFields.class, BOOLEAN_SIZE * 3); + tf.test(SubSubClassWithThreePrimitiveByteFields.class, Byte.BYTES * 3); + tf.test(SubSubClassWithThreePrimitiveCharFields.class, Character.BYTES * 3); + tf.test(SubSubClassWithThreePrimitiveShortFields.class, Short.BYTES * 3); + } + + tf.test(SubClassWithTwoPrimitiveIntFields.class, Integer.BYTES * 2); + tf.test(SubClassWithTwoPrimitiveFloatFields.class, Float.BYTES * 2); + tf.test(SubClassWithTwoPrimitiveLongFields.class, Long.BYTES * 2); + tf.test(SubClassWithTwoPrimitiveDoubleFields.class, Double.BYTES * 2); + + tf.test(SubSubClassWithThreePrimitiveIntFields.class, Integer.BYTES * 3); + tf.test(SubSubClassWithThreePrimitiveFloatFields.class, Float.BYTES * 3); + tf.test(SubSubClassWithThreePrimitiveLongFields.class, Long.BYTES * 3); + tf.test(SubSubClassWithThreePrimitiveDoubleFields.class, Double.BYTES * 3); + + // Ensure that pointers are properly accounted. + tf.test(ClassWithThreeObjectPointers.class, POINTER_SIZE * 3); + + // Ensure that arrays are properly accounted. + tf.test(ClassWithArray.class, POINTER_SIZE); + tf.test(Object[].class, () -> new Object[0]); + tf.test(boolean[].class, () -> new boolean[0]); + tf.test(byte[].class, () -> new byte[0]); + tf.test(char[].class, () -> new char[0]); + tf.test(short[].class, () -> new short[0]); + tf.test(int[].class, () -> new int[0]); + tf.test(float[].class, () -> new float[0]); + tf.test(long[].class, () -> new long[0]); + tf.test(double[].class, () -> new double[0]); + + /** + * Ensure that field packing and ordering is accounted. + * + * Note that we don't actually do anything special about packing and ordering, and things still work. The ordering + * can have implications in terms of padding fields to protect against false sharing, but does not appear to have + * a concrete effect on memory utilization (or at least, none of the current test cases expose such issue). + */ + tf.test(FieldPacking.class); + tf.test(FieldOrder.class); + + // Put it all together... + tf.test(ComplexClass.class, POINTER_SIZE * 10); + + printResultSeparatorLine(); + } + + private static class SubclassOfObjectWithNoFields { + public SubclassOfObjectWithNoFields() { + } + } + + private static class ClassWithOnePublicPrimitiveBooleanField { + public boolean publicField; + + public ClassWithOnePublicPrimitiveBooleanField() { + } + } + + private static class ClassWithOnePublicPrimitiveByteField { + public byte publicField; + + public ClassWithOnePublicPrimitiveByteField() { + } + } + + private static class ClassWithOnePublicPrimitiveCharField { + public char publicField; + + public ClassWithOnePublicPrimitiveCharField() { + } + } + + private static class ClassWithOnePublicPrimitiveShortField { + public short publicField; + + public ClassWithOnePublicPrimitiveShortField() { + } + } + + private static class ClassWithOnePublicPrimitiveIntField { + public int publicField; + + public ClassWithOnePublicPrimitiveIntField() { + } + } + + private static class ClassWithOnePublicPrimitiveFloatField { + public float publicField; + + public ClassWithOnePublicPrimitiveFloatField() { + } + } + + private static class ClassWithOnePublicPrimitiveLongField { + public long publicField; + + public ClassWithOnePublicPrimitiveLongField() { + } + } + + private static class ClassWithOnePublicPrimitiveDoubleField { + public double publicField; + + public ClassWithOnePublicPrimitiveDoubleField() { + } + } + + private static class ClassWithTwoPrimitiveBooleanFields { + private boolean field1, field2; + + public ClassWithTwoPrimitiveBooleanFields() { + } + } + + private static class ClassWithTwoPrimitiveByteFields { + private byte field1, field2; + + public ClassWithTwoPrimitiveByteFields() { + } + } + + private static class ClassWithTwoPrimitiveCharFields { + private char field1, field2; + + public ClassWithTwoPrimitiveCharFields() { + } + } + + private static class ClassWithTwoPrimitiveShortFields { + private short field1, field2; + + public ClassWithTwoPrimitiveShortFields() { + } + } + + private static class ClassWithTwoPrimitiveIntFields { + private int field1, field2; + + public ClassWithTwoPrimitiveIntFields() { + } + } + + private static class ClassWithTwoPrimitiveFloatFields { + private float field1, field2; + + public ClassWithTwoPrimitiveFloatFields() { + } + } + + private static class ClassWithTwoPrimitiveLongFields { + private long field1, field2; + + public ClassWithTwoPrimitiveLongFields() { + } + } + + private static class ClassWithTwoPrimitiveDoubleFields { + private double field1, field2; + + public ClassWithTwoPrimitiveDoubleFields() { + } + } + + private static class SubClassWithTwoPrimitiveBooleanFields extends ClassWithOnePublicPrimitiveBooleanField { + private boolean privateField; + + public SubClassWithTwoPrimitiveBooleanFields() { + } + } + + private static class SubClassWithTwoPrimitiveByteFields extends ClassWithOnePublicPrimitiveByteField { + private byte privateField; + + public SubClassWithTwoPrimitiveByteFields() { + } + } + + private static class SubClassWithTwoPrimitiveCharFields extends ClassWithOnePublicPrimitiveCharField { + private char privateField; + + public SubClassWithTwoPrimitiveCharFields() { + } + } + + private static class SubClassWithTwoPrimitiveShortFields extends ClassWithOnePublicPrimitiveShortField { + private short privateField; + + public SubClassWithTwoPrimitiveShortFields() { + } + } + + private static class SubClassWithTwoPrimitiveIntFields extends ClassWithOnePublicPrimitiveIntField { + private int privateField; + + public SubClassWithTwoPrimitiveIntFields() { + } + } + + private static class SubClassWithTwoPrimitiveFloatFields extends ClassWithOnePublicPrimitiveFloatField { + private float privateField; + + public SubClassWithTwoPrimitiveFloatFields() { + } + } + + private static class SubClassWithTwoPrimitiveLongFields extends ClassWithOnePublicPrimitiveLongField { + private long privateField; + + public SubClassWithTwoPrimitiveLongFields() { + } + } + + private static class SubClassWithTwoPrimitiveDoubleFields extends ClassWithOnePublicPrimitiveDoubleField { + private double privateField; + + public SubClassWithTwoPrimitiveDoubleFields() { + } + } + + private static class SubSubClassWithThreePrimitiveBooleanFields extends SubClassWithTwoPrimitiveBooleanFields { + private boolean privateField; + + public SubSubClassWithThreePrimitiveBooleanFields() { + } + } + + private static class SubSubClassWithThreePrimitiveByteFields extends SubClassWithTwoPrimitiveByteFields { + private byte privateField; + + public SubSubClassWithThreePrimitiveByteFields() { + } + } + + private static class SubSubClassWithThreePrimitiveCharFields extends SubClassWithTwoPrimitiveCharFields { + private char privateField; + + public SubSubClassWithThreePrimitiveCharFields() { + } + } + + private static class SubSubClassWithThreePrimitiveShortFields extends SubClassWithTwoPrimitiveShortFields { + private short privateField; + + public SubSubClassWithThreePrimitiveShortFields() { + } + } + + private static class SubSubClassWithThreePrimitiveIntFields extends SubClassWithTwoPrimitiveIntFields { + private int privateField; + + public SubSubClassWithThreePrimitiveIntFields() { + } + } + + private static class SubSubClassWithThreePrimitiveFloatFields extends SubClassWithTwoPrimitiveFloatFields { + private float privateField; + + public SubSubClassWithThreePrimitiveFloatFields() { + } + } + + private static class SubSubClassWithThreePrimitiveLongFields extends SubClassWithTwoPrimitiveLongFields { + private long privateField; + + public SubSubClassWithThreePrimitiveLongFields() { + } + } + + private static class SubSubClassWithThreePrimitiveDoubleFields extends SubClassWithTwoPrimitiveDoubleFields { + private double privateField; + + public SubSubClassWithThreePrimitiveDoubleFields() { + } + } + + private static class ClassWithThreeObjectPointers { + Object field1, field2, field3; + + public ClassWithThreeObjectPointers() { + } + } + + private static class ClassWithArray { + public Object[] array; + + public ClassWithArray() { + } + } + + /** See: https://shipilev.net/jvm/objects-inside-out/#_field_packing */ + private static class FieldPacking { + boolean b; + long l; + char c; + int i; + + public FieldPacking() { + } + } + + /** See: https://shipilev.net/jvm/objects-inside-out/#_observation_field_declaration_order_field_layout_order */ + private static class FieldOrder { + boolean firstField; + long secondField; + char thirdField; + int fourthField; + + public FieldOrder() { + } + } + + private static class ComplexClass { + ClassWithTwoPrimitiveBooleanFields field1; + ClassWithTwoPrimitiveByteFields field2; + ClassWithTwoPrimitiveCharFields field3; + ClassWithTwoPrimitiveShortFields field4; + ClassWithTwoPrimitiveIntFields field5; + ClassWithTwoPrimitiveFloatFields field6; + ClassWithTwoPrimitiveLongFields field7; + ClassWithTwoPrimitiveDoubleFields field8; + ClassWithThreeObjectPointers field9; + ClassWithArray field10; + + public ComplexClass() { + } + } +} diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/memory/HeapSizeEstimatorTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/memory/HeapSizeEstimatorTest.java new file mode 100644 index 0000000000..7099bf2848 --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/memory/HeapSizeEstimatorTest.java @@ -0,0 +1,292 @@ +package com.linkedin.venice.memory; + +import static com.linkedin.venice.memory.ClassSizeEstimator.getClassOverhead; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.fail; + +import com.linkedin.venice.utils.Utils; +import java.lang.reflect.Constructor; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; + + +public abstract class HeapSizeEstimatorTest { + private static final Logger LOGGER = LogManager.getLogger(HeapSizeEstimatorTest.class); + /** + * Some scenarios are tricky to compute dynamically without just copy-pasting the whole main code, so we just skip it + * for now, though we could come back to it later... + */ + private static final int SKIP_EXPECTED_FIELD_OVERHEAD = -1; + private static final Runtime RUNTIME = Runtime.getRuntime(); + private static final int NUMBER_OF_ALLOCATIONS_WHEN_MEASURING = 200_000; + protected static final int JAVA_MAJOR_VERSION = Utils.getJavaMajorVersion(); + protected static final int BOOLEAN_SIZE = 1; + private static final int ALIGNMENT_SIZE; + protected static final int OBJECT_HEADER_SIZE; + protected static final int ARRAY_HEADER_SIZE; + protected static final int POINTER_SIZE; + + static { + // This duplicates the main code, which is not ideal, but there isn't much choice if we want the test to run in + // various JVM scenarios... + boolean is64bitsJVM = ClassSizeEstimator.is64bitsJVM(); + int markWordSize = is64bitsJVM ? 8 : 4; + boolean isCompressedOopsEnabled = ClassSizeEstimator.isUseCompressedOopsEnabled(); + boolean isCompressedKlassPointersEnabled = ClassSizeEstimator.isCompressedKlassPointersEnabled(); + int classPointerSize = isCompressedKlassPointersEnabled ? 4 : 8; + + ALIGNMENT_SIZE = is64bitsJVM ? 8 : 4; + OBJECT_HEADER_SIZE = markWordSize + classPointerSize; + ARRAY_HEADER_SIZE = roundUpToNearestAlignment(OBJECT_HEADER_SIZE + Integer.BYTES); + POINTER_SIZE = isCompressedOopsEnabled ? 4 : 8; + } + + private final int[] resultRowCellLengths; + + protected HeapSizeEstimatorTest(Class classWithLongestName) { + this.resultRowCellLengths = new int[] { 6, classWithLongestName.getSimpleName().length(), 9, 9 }; + } + + protected interface TestFunction { + default void test(Class c) { + test(c, SKIP_EXPECTED_FIELD_OVERHEAD, null); + } + + default void test(Class c, Supplier constructor) { + test(c, SKIP_EXPECTED_FIELD_OVERHEAD, constructor); + } + + default void test(Class c, int expectedFieldOverhead) { + test(c, expectedFieldOverhead, null); + } + + void test(Class c, int expectedFieldOverhead, Supplier constructor); + } + + protected enum TestMethodology { + THEORETICAL_EXPECTATION( + o -> o::theoreticalExpectation, new String[] { "status", "class name", "predicted", "expected" } + ), + EMPIRICAL_MEASUREMENT( + o -> o::empiricalClassMeasurement, new String[] { "status", "class name", "predicted", "allocated" } + ); + + final Function tfProvider; + final String[] resultsTableHeader; + + TestMethodology(Function tfProvider, String[] resultsTableHeader) { + this.tfProvider = tfProvider; + this.resultsTableHeader = resultsTableHeader; + } + } + + @DataProvider + public Object[][] testMethodologies() { + return new Object[][] { { TestMethodology.THEORETICAL_EXPECTATION }, { TestMethodology.EMPIRICAL_MEASUREMENT } }; + } + + @BeforeTest + public void preTest() { + System.gc(); + } + + protected void theoreticalExpectation(Class c, int expectedFieldOverhead, Supplier constructor) { + int predictedClassOverhead = getClassOverhead(c); + int expectedClassOverheadWithoutAlignment = OBJECT_HEADER_SIZE + expectedFieldOverhead; + if (expectedFieldOverhead != SKIP_EXPECTED_FIELD_OVERHEAD) { + int expectedClassOverhead = roundUpToNearestAlignment(expectedClassOverheadWithoutAlignment); + boolean success = predictedClassOverhead == expectedClassOverhead; + printResultRow( + status(success, 1, 1), + c.getSimpleName(), + String.valueOf(predictedClassOverhead), + String.valueOf(expectedClassOverhead)); + assertEquals(predictedClassOverhead, expectedClassOverhead); + } + } + + private void empiricalClassMeasurement(Class c, int expectedFieldOverhead, Supplier constructor) { + empiricalMeasurement(c, getClassOverhead(c), constructor); + } + + protected void empiricalMeasurement(Class c, int predictedUsage, Supplier constructor) { + /** + * The reason for having multiple attempts is that the allocation measurement method is not always reliable. + * Presumably, this is because GC could kick in during the middle of the allocation loop. If the allocated memory + * is negative then for sure it's not right. If the GC reduces memory allocated but not enough to make the + * measurement go negative, then we cannot know if it's a measurement error, or a bug... In any case, we will do + * a few attempts and assume that the measurement is good if it falls within the prescribed delta (even though + * technically that could be a false negative). + */ + int currentAttempt = 0; + int totalAttempts = 3; + while (currentAttempt++ < totalAttempts) { + assertNotEquals(RUNTIME.maxMemory(), Long.MAX_VALUE); + Object[] allocations = new Object[NUMBER_OF_ALLOCATIONS_WHEN_MEASURING]; + + if (constructor == null) { + Class[] argTypes = new Class[0]; + Object[] args = new Object[0]; + Constructor reflectiveConstructor; + try { + reflectiveConstructor = c.getConstructor(argTypes); + } catch (NoSuchMethodException e) { + fail("Could not get a no-arg constructor for " + c.getSimpleName(), e); + throw new RuntimeException(e); + } + + constructor = () -> { + try { + return reflectiveConstructor.newInstance(args); + } catch (Exception e) { + fail("Could not invoke the no-arg constructor for " + c.getSimpleName(), e); + } + return null; // Unreachable code, just to appease the compiler + }; + } + + long memoryAllocatedBeforeInstantiations = getCurrentlyAllocatedMemory(); + + for (int i = 0; i < NUMBER_OF_ALLOCATIONS_WHEN_MEASURING; i++) { + allocations[i] = constructor.get(); + } + + long memoryAllocatedAfterInstantiations = getCurrentlyAllocatedMemory(); + long memoryAllocatedByInstantiations = memoryAllocatedAfterInstantiations - memoryAllocatedBeforeInstantiations; + if (memoryAllocatedByInstantiations < 0) { + String errorMessage = "Memory allocated is negative! memoryAllocatedBeforeInstantiations: " + + memoryAllocatedBeforeInstantiations + "; memoryAllocatedAfterInstantiations: " + + memoryAllocatedAfterInstantiations + "; memoryAllocatedByInstantiations: " + + memoryAllocatedByInstantiations + "; " + currentAttempt + " attempts left."; + if (currentAttempt < totalAttempts) { + LOGGER.info(errorMessage); + continue; + } else { + fail(errorMessage); + } + } + + double memoryAllocatedPerInstance = + (double) memoryAllocatedByInstantiations / (double) NUMBER_OF_ALLOCATIONS_WHEN_MEASURING; + + for (int i = 0; i < NUMBER_OF_ALLOCATIONS_WHEN_MEASURING; i++) { + assertNotNull(allocations[i]); + } + + // Since the above method for measuring allocated memory is imperfect, we need to tolerate some delta. + double allocatedToPredictedRatio = memoryAllocatedPerInstance / (double) predictedUsage; + double delta = Math.abs(1 - allocatedToPredictedRatio); + + // For small objects, any delta of 1 byte or less will be tolerated + double minimumAbsoluteDeltaInBytes = 1; + double minimumAbsoluteDelta = minimumAbsoluteDeltaInBytes / memoryAllocatedPerInstance; + + // For larger objects, we'll tolerate up to 1% delta + double minimumRelativeDelta = 0.01; + + // The larger of the two deltas is the one we use + double maxAllowedDelta = Math.max(minimumAbsoluteDelta, minimumRelativeDelta); + + boolean success = delta < maxAllowedDelta; + + printResultRow( + status(success, currentAttempt, totalAttempts), + c.getSimpleName(), + String.valueOf(predictedUsage), + String.format("%.3f", memoryAllocatedPerInstance)); + + // A best-effort attempt to minimize the chance of needing to GC in the middle of the next measurement run... + allocations = null; + System.gc(); + + if (success) { + break; // No more attempts needed if the allocation measurement and all assertions succeeded + } else if (currentAttempt == totalAttempts) { + fail( + "Class " + c.getSimpleName() + " has a memoryAllocatedPerInstance (" + memoryAllocatedPerInstance + + ") which is too far from the predictedUsage (" + predictedUsage + "); delta: " + + String.format("%.3f", delta) + "; maxAllowedDelta: " + String.format("%.3f", maxAllowedDelta) + + ". No more attempts left."); + } + } + } + + /** Different algo than the main code because why not? It should be equivalent... */ + protected static int roundUpToNearestAlignment(int size) { + double numberOfAlignmentWindowsFittingWithinTheSize = (double) size / ALIGNMENT_SIZE; + double roundedUp = Math.ceil(numberOfAlignmentWindowsFittingWithinTheSize); + int finalSize = (int) roundedUp * ALIGNMENT_SIZE; + return finalSize; + } + + private static long getCurrentlyAllocatedMemory() { + System.gc(); + return RUNTIME.maxMemory() - RUNTIME.freeMemory(); + } + + @BeforeClass + public void printEnvironmentInfo() { + LOGGER.info("Java major version: " + JAVA_MAJOR_VERSION); + LOGGER.info("Alignment size: " + ALIGNMENT_SIZE); + LOGGER.info("Object header size: " + OBJECT_HEADER_SIZE); + LOGGER.info("Array header size: " + ARRAY_HEADER_SIZE); + LOGGER.info("Pointer size: " + POINTER_SIZE); + } + + protected void printHeader(String... headerCells) { + printResultSeparatorLine(); + printResultRow(headerCells); + printResultSeparatorLine(); + } + + protected void printResultSeparatorLine() { + StringBuilder sb = new StringBuilder(); + sb.append(" "); + for (int i = 0; i < this.resultRowCellLengths.length; i++) { + sb.append("+-"); + for (int j = 0; j < this.resultRowCellLengths[i] + 1; j++) { + sb.append('-'); + } + } + sb.append("+"); + LOGGER.info(sb.toString()); + } + + private static String status(boolean success, int currentAttempt, int totalAttempts) { + String symbol = new String(Character.toChars(success ? 0x2705 : 0x274C)); + if (totalAttempts == 1) { + return symbol; + } + return symbol + " " + currentAttempt + "/" + totalAttempts; + } + + private void printResultRow(String... cells) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < cells.length; i++) { + sb.append(" | "); + String cell = cells[i]; + sb.append(cell); + + int remainder = this.resultRowCellLengths[i] - cell.length(); + for (Character character: cell.toCharArray()) { + if (Character.UnicodeBlock.of(character) != Character.UnicodeBlock.BASIC_LATIN) { + // Emoticons take two characters' width + remainder--; + } + } + + for (int j = 0; j < remainder; j++) { + sb.append(' '); + } + } + sb.append(" |"); + LOGGER.info(sb.toString()); + } +} diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/memory/InstanceSizeEstimatorTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/memory/InstanceSizeEstimatorTest.java new file mode 100644 index 0000000000..095390ca5b --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/memory/InstanceSizeEstimatorTest.java @@ -0,0 +1,152 @@ +package com.linkedin.venice.memory; + +import static com.linkedin.venice.kafka.protocol.enums.MessageType.PUT; + +import com.linkedin.davinci.kafka.consumer.LeaderProducedRecordContext; +import com.linkedin.davinci.kafka.consumer.SBSQueueNodeFactory; +import com.linkedin.venice.kafka.protocol.GUID; +import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope; +import com.linkedin.venice.kafka.protocol.LeaderMetadata; +import com.linkedin.venice.kafka.protocol.ProducerMetadata; +import com.linkedin.venice.kafka.protocol.Put; +import com.linkedin.venice.message.KafkaKey; +import com.linkedin.venice.pubsub.ImmutablePubSubMessage; +import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; +import com.linkedin.venice.pubsub.PubSubTopicRepository; +import com.linkedin.venice.pubsub.api.PubSubMessage; +import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.writer.VeniceWriter; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Supplier; +import org.testng.annotations.Test; + + +public class InstanceSizeEstimatorTest extends HeapSizeEstimatorTest { + public InstanceSizeEstimatorTest() { + super(LeaderProducedRecordContext.class); + } + + @Test + public void testInstanceMeasurement() { + printHeader(TestMethodology.EMPIRICAL_MEASUREMENT.resultsTableHeader); + + // Try various array sizes to take alignment into account. + List> kafkaKeySuppliers = new ArrayList<>(); + kafkaKeySuppliers.add(() -> new KafkaKey(PUT, new byte[0])); + kafkaKeySuppliers.add(() -> new KafkaKey(PUT, new byte[2])); + kafkaKeySuppliers.add(() -> new KafkaKey(PUT, new byte[4])); + kafkaKeySuppliers.add(() -> new KafkaKey(PUT, new byte[6])); + kafkaKeySuppliers.add(() -> new KafkaKey(PUT, new byte[8])); + kafkaKeySuppliers.add(() -> new KafkaKey(PUT, new byte[10])); + + for (Supplier kafkaKeySupplier: kafkaKeySuppliers) { + empiricalInstanceMeasurement(KafkaKey.class, kafkaKeySupplier); + } + + Supplier producerMetadataSupplier = () -> new ProducerMetadata(new GUID(), 0, 0, 0L, 0L); + empiricalInstanceMeasurement(ProducerMetadata.class, producerMetadataSupplier); + + Supplier rtPutSupplier = () -> new Put(ByteBuffer.allocate(10), 1, -1, null); + Supplier vtPutSupplier = () -> { + byte[] rawKafkaValue = new byte[50]; + return new Put(ByteBuffer.wrap(rawKafkaValue, 10, 10), 1, 1, ByteBuffer.wrap(rawKafkaValue, 25, 10)); + }; + List> putSuppliers = new ArrayList<>(); + putSuppliers.add(rtPutSupplier); + putSuppliers.add(vtPutSupplier); + for (Supplier putSupplier: putSuppliers) { + empiricalInstanceMeasurement(Put.class, putSupplier); + } + + PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); + /** The {@link PubSubTopicPartition} is supposed to be a shared instance, but it cannot be null. */ + PubSubTopicPartition pubSubTopicPartition = + new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("topic"), 0); + + VeniceWriter.DefaultLeaderMetadata defaultLeaderMetadata = new VeniceWriter.DefaultLeaderMetadata("blah"); + + List> kmeSuppliers = new ArrayList<>(); + Supplier rtKmeSupplier = () -> new KafkaMessageEnvelope( + // What a message in the RT topic might look like + PUT.getValue(), + producerMetadataSupplier.get(), + rtPutSupplier.get(), + // The (improperly-named) "leader" metadata footer is always populated, but in this path it points to a + // static instance. + defaultLeaderMetadata); + Supplier vtKmeSupplier = () -> new KafkaMessageEnvelope( + PUT.getValue(), + producerMetadataSupplier.get(), + vtPutSupplier.get(), + new LeaderMetadata( + null, // shared instance + 0L, + 0)); + kmeSuppliers.add(rtKmeSupplier); + kmeSuppliers.add(vtKmeSupplier); + // TODO: Add updates, deletes... + + for (Supplier kmeSupplier: kmeSuppliers) { + empiricalInstanceMeasurement(KafkaMessageEnvelope.class, kmeSupplier); + } + + BiFunction, Supplier, PubSubMessage> psmProvider = + (kafkaKeySupplier, kmeSupplier) -> new ImmutablePubSubMessage<>( + kafkaKeySupplier.get(), + kmeSupplier.get(), + pubSubTopicPartition, + 0, + 0, + 0); + + for (Supplier kafkaKeySupplier: kafkaKeySuppliers) { + for (Supplier kmeSupplier: kmeSuppliers) { + empiricalInstanceMeasurement( + ImmutablePubSubMessage.class, + () -> psmProvider.apply(kafkaKeySupplier, kmeSupplier)); + } + } + + for (Supplier kafkaKeySupplier: kafkaKeySuppliers) { + for (Supplier kmeSupplier: kmeSuppliers) { + empiricalInstanceMeasurement( + SBSQueueNodeFactory.queueNodeClass(), + () -> SBSQueueNodeFactory.queueNode(psmProvider.apply(kafkaKeySupplier, kmeSupplier), null, null, 0)); + } + } + + int kafkaClusterId = 0; + Supplier leaderProducedRecordContextSupplierForPut = + () -> LeaderProducedRecordContext.newPutRecord(kafkaClusterId, 0, new byte[10], vtPutSupplier.get()); + List> leaderProducedRecordContextSuppliers = new ArrayList<>(); + leaderProducedRecordContextSuppliers.add(leaderProducedRecordContextSupplierForPut); + + for (Supplier leaderProducedRecordContextSupplier: leaderProducedRecordContextSuppliers) { + empiricalInstanceMeasurement(LeaderProducedRecordContext.class, leaderProducedRecordContextSupplier); + } + + for (Supplier kafkaKeySupplier: kafkaKeySuppliers) { + for (Supplier leaderProducedRecordContextSupplier: leaderProducedRecordContextSuppliers) { + empiricalInstanceMeasurement( + SBSQueueNodeFactory.leaderQueueNodeClass(), + () -> SBSQueueNodeFactory.leaderQueueNode( + psmProvider.apply(kafkaKeySupplier, vtKmeSupplier), + null, + null, + 0, + leaderProducedRecordContextSupplier.get())); + } + } + + printResultSeparatorLine(); + } + + private void empiricalInstanceMeasurement(Class c, Supplier constructor) { + Object o = constructor.get(); + int expectedSize = InstanceSizeEstimator.getObjectSize(o); + empiricalMeasurement(c, expectedSize, constructor); + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/PubSubProducerCallbackSimpleImpl.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/PubSubProducerCallbackSimpleImpl.java similarity index 100% rename from internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/PubSubProducerCallbackSimpleImpl.java rename to internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/PubSubProducerCallbackSimpleImpl.java diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaUtilsTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaUtilsTest.java index a9f3ccd86b..ad19ceafbb 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaUtilsTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/ApacheKafkaUtilsTest.java @@ -2,12 +2,16 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; +import com.linkedin.venice.pubsub.api.EmptyPubSubMessageHeaders; import com.linkedin.venice.pubsub.api.PubSubMessageHeader; import com.linkedin.venice.pubsub.api.PubSubMessageHeaders; import java.util.LinkedHashMap; import java.util.Map; +import java.util.function.Supplier; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeaders; import org.testng.annotations.Test; @@ -15,10 +19,22 @@ public class ApacheKafkaUtilsTest { @Test - public void testConvertToKafkaSpecificHeadersWhenPubsubMessageHeadersIsNull() { - RecordHeaders recordHeaders = ApacheKafkaUtils.convertToKafkaSpecificHeaders(null); - assertNotNull(recordHeaders); - assertEquals(recordHeaders.toArray().length, 0); + public void testConvertToKafkaSpecificHeadersWhenPubsubMessageHeadersIsNullOrEmpty() { + Supplier[] suppliers = + new Supplier[] { () -> null, () -> new PubSubMessageHeaders(), () -> EmptyPubSubMessageHeaders.SINGLETON }; + for (Supplier supplier: suppliers) { + RecordHeaders recordHeaders = ApacheKafkaUtils.convertToKafkaSpecificHeaders(supplier.get()); + assertNotNull(recordHeaders); + assertEquals(recordHeaders.toArray().length, 0); + assertThrows(() -> recordHeaders.add("foo", "bar".getBytes())); + + RecordHeaders recordHeaders2 = ApacheKafkaUtils.convertToKafkaSpecificHeaders(supplier.get()); + assertNotNull(recordHeaders2); + assertEquals(recordHeaders2.toArray().length, 0); + assertThrows(() -> recordHeaders2.add("foo", "bar".getBytes())); + + assertSame(recordHeaders, recordHeaders2); + } } @Test diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubProducerAdapterConcurrentDelegatorTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubProducerAdapterConcurrentDelegatorTest.java index b9ca3f20df..07f84d26ce 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubProducerAdapterConcurrentDelegatorTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubProducerAdapterConcurrentDelegatorTest.java @@ -85,7 +85,7 @@ public void testProducerQueueNode() { new PubSubMessageHeaders(), null, null); - assertTrue(node.getSize() > 0); + assertTrue(node.getHeapSize() > 0); } @Test diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/utils/DictionaryUtilsTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/utils/DictionaryUtilsTest.java index 55b2846a9f..2f587671cd 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/utils/DictionaryUtilsTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/utils/DictionaryUtilsTest.java @@ -46,7 +46,7 @@ public void testGetDictionary() { PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(topic, 0); doReturn(topic).when(pubSubTopicRepository).getTopic(topic.getName()); - KafkaKey controlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, null); + KafkaKey controlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, new byte[0]); StartOfPush startOfPush = new StartOfPush(); startOfPush.compressionStrategy = CompressionStrategy.ZSTD_WITH_DICT.getValue(); startOfPush.compressionDictionary = ByteBuffer.wrap(dictionaryToSend); @@ -77,7 +77,7 @@ public void testGetDictionaryReturnsNullWhenNoDictionary() { PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(topic, 0); doReturn(topic).when(pubSubTopicRepository).getTopic(topic.getName()); - KafkaKey controlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, null); + KafkaKey controlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, new byte[0]); StartOfPush startOfPush = new StartOfPush(); ControlMessage sopCM = new ControlMessage(); @@ -134,7 +134,7 @@ public void testGetDictionaryWaitsTillTopicHasRecords() { PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(topic, 0); doReturn(topic).when(pubSubTopicRepository).getTopic(topic.getName()); - KafkaKey controlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, null); + KafkaKey controlMessageKey = new KafkaKey(MessageType.CONTROL_MESSAGE, new byte[0]); StartOfPush startOfPush = new StartOfPush(); startOfPush.compressionStrategy = CompressionStrategy.ZSTD_WITH_DICT.getValue(); startOfPush.compressionDictionary = ByteBuffer.wrap(dictionaryToSend); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/utils/collections/MemoryBoundBlockingQueueTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/utils/collections/MemoryBoundBlockingQueueTest.java index a390dbfdba..e4286a2abb 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/utils/collections/MemoryBoundBlockingQueueTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/utils/collections/MemoryBoundBlockingQueueTest.java @@ -1,6 +1,6 @@ package com.linkedin.venice.utils.collections; -import com.linkedin.venice.common.Measurable; +import com.linkedin.venice.memory.Measurable; import com.linkedin.venice.utils.TestUtils; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -13,7 +13,7 @@ private static class MeasurableObject implements Measurable { public static final int SIZE = 10; @Override - public int getSize() { + public int getHeapSize() { return SIZE; } } @@ -23,7 +23,7 @@ public void testPut() throws InterruptedException { int memoryCap = 5000; MemoryBoundBlockingQueue queue = new MemoryBoundBlockingQueue<>(memoryCap, 1000); int objectCntAtMost = - memoryCap / (MemoryBoundBlockingQueue.LINKED_QUEUE_NODE_OVERHEAD_IN_BYTE + MeasurableObject.SIZE); + memoryCap / (MemoryBoundBlockingQueue.LINKED_LIST_NODE_SHALLOW_OVERHEAD + MeasurableObject.SIZE); Thread t = new Thread(() -> { while (true) { try { @@ -48,7 +48,7 @@ public void testTake() throws InterruptedException { int memoryCap = 5000; MemoryBoundBlockingQueue queue = new MemoryBoundBlockingQueue<>(memoryCap, 1000); int objectCntAtMost = - memoryCap / (MemoryBoundBlockingQueue.LINKED_QUEUE_NODE_OVERHEAD_IN_BYTE + MeasurableObject.SIZE); + memoryCap / (MemoryBoundBlockingQueue.LINKED_LIST_NODE_SHALLOW_OVERHEAD + MeasurableObject.SIZE); for (int i = 0; i < objectCntAtMost; ++i) { queue.put(new MeasurableObject()); } @@ -81,7 +81,7 @@ public void testThrottling() throws InterruptedException { int notifyDelta = 1000; MemoryBoundBlockingQueue queue = new MemoryBoundBlockingQueue<>(memoryCap, notifyDelta); int objectCntAtMost = - memoryCap / (MemoryBoundBlockingQueue.LINKED_QUEUE_NODE_OVERHEAD_IN_BYTE + MeasurableObject.SIZE); + memoryCap / (MemoryBoundBlockingQueue.LINKED_LIST_NODE_SHALLOW_OVERHEAD + MeasurableObject.SIZE); Thread t = new Thread(() -> { while (true) { try { @@ -102,7 +102,7 @@ public void testThrottling() throws InterruptedException { int previousQueueSize = queue.size(); // Here we need to take out some objects to allow more put double objectCntTakenAtLeast = Math.ceil( - (double) notifyDelta / (MemoryBoundBlockingQueue.LINKED_QUEUE_NODE_OVERHEAD_IN_BYTE + MeasurableObject.SIZE)); + (double) notifyDelta / (MemoryBoundBlockingQueue.LINKED_LIST_NODE_SHALLOW_OVERHEAD + MeasurableObject.SIZE)); for (int i = 1; i < objectCntTakenAtLeast; ++i) { queue.take(); Assert.assertEquals(queue.size(), previousQueueSize - 1); diff --git a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/PubSubHelper.java b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/PubSubHelper.java index ba268eb134..1fe429d10c 100644 --- a/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/PubSubHelper.java +++ b/internal/venice-test-common/src/main/java/com/linkedin/venice/utils/PubSubHelper.java @@ -168,5 +168,10 @@ public long getTimestampBeforeProduce() { public long getTimestampAfterProduce() { return timestampAfterProduce; } + + @Override + public int getHeapSize() { + throw new UnsupportedOperationException("getHeapSize is not supported on " + this.getClass().getSimpleName()); + } } }