Skip to content

Commit

Permalink
[fix][client] Cache empty schema version in ProducerImpl schemaCache. (
Browse files Browse the repository at this point in the history
…apache#19929)

Co-authored-by: wangjinlong <[email protected]>
(cherry picked from commit cff3f9b)
(cherry picked from commit a46acef)
  • Loading branch information
lifepuzzlefun authored and nicoloboschi committed Jun 6, 2023
1 parent 4f39556 commit c02ecfc
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandCloseProducerHook;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandConnectHook;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandFlowHook;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandGetOrCreateSchemaHook;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandPartitionLookupHook;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandProducerHook;
import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandSendHook;
Expand All @@ -55,6 +56,7 @@
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandConnect;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchema;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
Expand All @@ -77,6 +79,7 @@
import org.slf4j.LoggerFactory;

/**
*
*/
public class MockBrokerService {
private LookupData lookupData;
Expand Down Expand Up @@ -244,6 +247,19 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
ctx.writeAndFlush(Commands.newSuccess(closeConsumer.getRequestId()));
}

@Override
protected void handleGetOrCreateSchema(CommandGetOrCreateSchema commandGetOrCreateSchema) {
if (handleGetOrCreateSchema != null) {
handleGetOrCreateSchema.apply(ctx, commandGetOrCreateSchema);
return;
}

// default
ctx.writeAndFlush(
Commands.newGetOrCreateSchemaResponse(commandGetOrCreateSchema.getRequestId(),
SchemaVersion.Empty));
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.warn("Got exception", cause);
Expand Down Expand Up @@ -276,6 +292,7 @@ final protected void handlePong(CommandPong pong) {
private CommandUnsubscribeHook handleUnsubscribe = null;
private CommandCloseProducerHook handleCloseProducer = null;
private CommandCloseConsumerHook handleCloseConsumer = null;
private CommandGetOrCreateSchemaHook handleGetOrCreateSchema = null;

public MockBrokerService() {
server = new Server(0);
Expand Down Expand Up @@ -416,6 +433,10 @@ public void setHandleCloseConsumer(CommandCloseConsumerHook hook) {
handleCloseConsumer = hook;
}

public void setHandleGetOrCreateSchema(CommandGetOrCreateSchemaHook hook) {
handleGetOrCreateSchema = hook;
}

public void resetHandleCloseConsumer() {
handleCloseConsumer = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandConnect;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchema;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.CommandProducer;
Expand Down Expand Up @@ -77,4 +78,8 @@ interface CommandCloseProducerHook {
interface CommandCloseConsumerHook {
void apply(ChannelHandlerContext ctx, CommandCloseConsumer closeConsumer);
}

interface CommandGetOrCreateSchemaHook {
void apply(ChannelHandlerContext ctx, CommandGetOrCreateSchema closeConsumer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import lombok.Cleanup;
import org.apache.pulsar.client.api.MockBrokerService;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.testng.Assert.assertEquals;

@Test(groups = "broker-impl")
public class ProducerEmptySchemaCacheTest {

MockBrokerService mockBrokerService;

@BeforeClass(alwaysRun = true)
public void setup() {
mockBrokerService = new MockBrokerService();
mockBrokerService.start();
}

@AfterClass(alwaysRun = true)
public void teardown() {
mockBrokerService.stop();
}

@Test
public void testProducerShouldCacheEmptySchema() throws Exception {
@Cleanup
PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder()
.serviceUrl(mockBrokerService.getBrokerAddress())
.build();

AtomicLong counter = new AtomicLong(0);

mockBrokerService.setHandleGetOrCreateSchema((ctx, commandGetOrCreateSchema) -> {
counter.incrementAndGet();
ctx.writeAndFlush(
Commands.newGetOrCreateSchemaResponse(commandGetOrCreateSchema.getRequestId(),
SchemaVersion.Empty));
});

// this schema mode is used in consumer retry and dlq Producer
// when the origin consumer has Schema.BYTES schema
// and when retry message or dlq message is send
// will use typed message builder set Schema.Bytes to send message.

Schema<byte[]> schema = Schema.BYTES;
Schema<byte[]> readerSchema = Schema.BYTES;

@Cleanup
Producer<byte[]> dlqProducer = client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
.topic("testAutoProduceBytesSchemaShouldCache")
.sendTimeout(5, TimeUnit.SECONDS)
.maxPendingMessages(0)
.enableBatching(false)
.create();

for (int i = 10; i > 0; i--) {
TypedMessageBuilder<byte[]> typedMessageBuilderNew =
dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(readerSchema))
.value("hello".getBytes());

typedMessageBuilderNew.send();
}

// schema should only be requested once.
// and if the schemaVersion is empty (e.g. Schema.BYTES)
// it should be cached by the client
// to avoid continuously send `CommandGetOrCreateSchema` rpc
assertEquals(counter.get(), 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.DateFormatter;
Expand Down Expand Up @@ -670,11 +671,16 @@ boolean populateMessageSchema(MessageImpl msg, SendCallback callback) {
completeCallbackAndReleaseSemaphore(msg.getUncompressedSize(), callback, e);
return false;
}

byte[] schemaVersion = schemaCache.get(msg.getSchemaHash());
if (schemaVersion != null) {
msgMetadataBuilder.setSchemaVersion(schemaVersion);
if (schemaVersion != SchemaVersion.Empty.bytes()) {
msgMetadataBuilder.setSchemaVersion(schemaVersion);
}

msg.setSchemaState(MessageImpl.SchemaState.Ready);
}

return true;
}

Expand All @@ -683,7 +689,11 @@ private boolean rePopulateMessageSchema(MessageImpl msg) {
if (schemaVersion == null) {
return false;
}
msg.getMessageBuilder().setSchemaVersion(schemaVersion);

if (schemaVersion != SchemaVersion.Empty.bytes()) {
msg.getMessageBuilder().setSchemaVersion(schemaVersion);
}

msg.setSchemaState(MessageImpl.SchemaState.Ready);
return true;
}
Expand All @@ -706,12 +716,15 @@ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback call
}
} else {
log.info("[{}] [{}] GetOrCreateSchema succeed", topic, producerName);
// In broker, if schema version is an empty byte array, it means the topic doesn't have schema. In this
// case, we should not cache the schema version so that the schema version of the message metadata will
// be null, instead of an empty array.
// In broker, if schema version is an empty byte array, it means the topic doesn't have schema.
// In this case, we cache the schema version to `SchemaVersion.Empty.bytes()`.
// When we need to set the schema version of the message metadata,
// we should check if the cached schema version is `SchemaVersion.Empty.bytes()`
if (v.length != 0) {
schemaCache.putIfAbsent(msg.getSchemaHash(), v);
msg.getMessageBuilder().setSchemaVersion(v);
} else {
schemaCache.putIfAbsent(msg.getSchemaHash(), SchemaVersion.Empty.bytes());
}
msg.setSchemaState(MessageImpl.SchemaState.Ready);
}
Expand Down

0 comments on commit c02ecfc

Please sign in to comment.