diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java index b16fac427aa7c..e240bd773993b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java @@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandCloseProducerHook; import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandConnectHook; import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandFlowHook; +import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandGetOrCreateSchemaHook; import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandPartitionLookupHook; import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandProducerHook; import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandSendHook; @@ -55,6 +56,7 @@ import org.apache.pulsar.common.api.proto.CommandCloseProducer; import org.apache.pulsar.common.api.proto.CommandConnect; import org.apache.pulsar.common.api.proto.CommandFlow; +import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchema; import org.apache.pulsar.common.api.proto.CommandLookupTopic; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType; import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata; @@ -77,6 +79,7 @@ import org.slf4j.LoggerFactory; /** + * */ public class MockBrokerService { private LookupData lookupData; @@ -244,6 +247,19 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) { ctx.writeAndFlush(Commands.newSuccess(closeConsumer.getRequestId())); } + @Override + protected void handleGetOrCreateSchema(CommandGetOrCreateSchema commandGetOrCreateSchema) { + if (handleGetOrCreateSchema != null) { + handleGetOrCreateSchema.apply(ctx, commandGetOrCreateSchema); + return; + } + + // default + ctx.writeAndFlush( + Commands.newGetOrCreateSchemaResponse(commandGetOrCreateSchema.getRequestId(), + SchemaVersion.Empty)); + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.warn("Got exception", cause); @@ -276,6 +292,7 @@ final protected void handlePong(CommandPong pong) { private CommandUnsubscribeHook handleUnsubscribe = null; private CommandCloseProducerHook handleCloseProducer = null; private CommandCloseConsumerHook handleCloseConsumer = null; + private CommandGetOrCreateSchemaHook handleGetOrCreateSchema = null; public MockBrokerService() { server = new Server(0); @@ -416,6 +433,10 @@ public void setHandleCloseConsumer(CommandCloseConsumerHook hook) { handleCloseConsumer = hook; } + public void setHandleGetOrCreateSchema(CommandGetOrCreateSchemaHook hook) { + handleGetOrCreateSchema = hook; + } + public void resetHandleCloseConsumer() { handleCloseConsumer = null; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerServiceHooks.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerServiceHooks.java index a68e2d5c32c30..3f9f535e7963e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerServiceHooks.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerServiceHooks.java @@ -26,6 +26,7 @@ import org.apache.pulsar.common.api.proto.CommandCloseProducer; import org.apache.pulsar.common.api.proto.CommandConnect; import org.apache.pulsar.common.api.proto.CommandFlow; +import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchema; import org.apache.pulsar.common.api.proto.CommandLookupTopic; import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata; import org.apache.pulsar.common.api.proto.CommandProducer; @@ -77,4 +78,8 @@ interface CommandCloseProducerHook { interface CommandCloseConsumerHook { void apply(ChannelHandlerContext ctx, CommandCloseConsumer closeConsumer); } + + interface CommandGetOrCreateSchemaHook { + void apply(ChannelHandlerContext ctx, CommandGetOrCreateSchema closeConsumer); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerEmptySchemaCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerEmptySchemaCacheTest.java new file mode 100644 index 0000000000000..079b8bd8eccbd --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerEmptySchemaCacheTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import lombok.Cleanup; +import org.apache.pulsar.client.api.MockBrokerService; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.schema.SchemaVersion; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.testng.Assert.assertEquals; + +@Test(groups = "broker-impl") +public class ProducerEmptySchemaCacheTest { + + MockBrokerService mockBrokerService; + + @BeforeClass(alwaysRun = true) + public void setup() { + mockBrokerService = new MockBrokerService(); + mockBrokerService.start(); + } + + @AfterClass(alwaysRun = true) + public void teardown() { + mockBrokerService.stop(); + } + + @Test + public void testProducerShouldCacheEmptySchema() throws Exception { + @Cleanup + PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder() + .serviceUrl(mockBrokerService.getBrokerAddress()) + .build(); + + AtomicLong counter = new AtomicLong(0); + + mockBrokerService.setHandleGetOrCreateSchema((ctx, commandGetOrCreateSchema) -> { + counter.incrementAndGet(); + ctx.writeAndFlush( + Commands.newGetOrCreateSchemaResponse(commandGetOrCreateSchema.getRequestId(), + SchemaVersion.Empty)); + }); + + // this schema mode is used in consumer retry and dlq Producer + // when the origin consumer has Schema.BYTES schema + // and when retry message or dlq message is send + // will use typed message builder set Schema.Bytes to send message. + + Schema schema = Schema.BYTES; + Schema readerSchema = Schema.BYTES; + + @Cleanup + Producer dlqProducer = client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)) + .topic("testAutoProduceBytesSchemaShouldCache") + .sendTimeout(5, TimeUnit.SECONDS) + .maxPendingMessages(0) + .enableBatching(false) + .create(); + + for (int i = 10; i > 0; i--) { + TypedMessageBuilder typedMessageBuilderNew = + dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(readerSchema)) + .value("hello".getBytes()); + + typedMessageBuilderNew.send(); + } + + // schema should only be requested once. + // and if the schemaVersion is empty (e.g. Schema.BYTES) + // it should be cached by the client + // to avoid continuously send `CommandGetOrCreateSchema` rpc + assertEquals(counter.get(), 1); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 486891bbd0e8d..f2334650ad8e1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -87,6 +87,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Commands.ChecksumType; import org.apache.pulsar.common.protocol.schema.SchemaHash; +import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.DateFormatter; @@ -670,11 +671,16 @@ boolean populateMessageSchema(MessageImpl msg, SendCallback callback) { completeCallbackAndReleaseSemaphore(msg.getUncompressedSize(), callback, e); return false; } + byte[] schemaVersion = schemaCache.get(msg.getSchemaHash()); if (schemaVersion != null) { - msgMetadataBuilder.setSchemaVersion(schemaVersion); + if (schemaVersion != SchemaVersion.Empty.bytes()) { + msgMetadataBuilder.setSchemaVersion(schemaVersion); + } + msg.setSchemaState(MessageImpl.SchemaState.Ready); } + return true; } @@ -683,7 +689,11 @@ private boolean rePopulateMessageSchema(MessageImpl msg) { if (schemaVersion == null) { return false; } - msg.getMessageBuilder().setSchemaVersion(schemaVersion); + + if (schemaVersion != SchemaVersion.Empty.bytes()) { + msg.getMessageBuilder().setSchemaVersion(schemaVersion); + } + msg.setSchemaState(MessageImpl.SchemaState.Ready); return true; } @@ -706,12 +716,15 @@ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback call } } else { log.info("[{}] [{}] GetOrCreateSchema succeed", topic, producerName); - // In broker, if schema version is an empty byte array, it means the topic doesn't have schema. In this - // case, we should not cache the schema version so that the schema version of the message metadata will - // be null, instead of an empty array. + // In broker, if schema version is an empty byte array, it means the topic doesn't have schema. + // In this case, we cache the schema version to `SchemaVersion.Empty.bytes()`. + // When we need to set the schema version of the message metadata, + // we should check if the cached schema version is `SchemaVersion.Empty.bytes()` if (v.length != 0) { schemaCache.putIfAbsent(msg.getSchemaHash(), v); msg.getMessageBuilder().setSchemaVersion(v); + } else { + schemaCache.putIfAbsent(msg.getSchemaHash(), SchemaVersion.Empty.bytes()); } msg.setSchemaState(MessageImpl.SchemaState.Ready); }