From 4a2fb3418a1b283e7c00d31c775e7b82d71a961e Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 16 May 2024 18:02:17 +0800 Subject: [PATCH] [fix][schema] Error checking schema compatibility on a schema-less topic via REST API (#22720) (cherry picked from commit 101aee4) --- .../AvroSchemaBasedCompatibilityCheck.java | 6 ++-- ...rotobufNativeSchemaCompatibilityCheck.java | 4 ++- .../schema/SchemaRegistryServiceImpl.java | 2 +- .../IncompatibleSchemaException.java | 4 +++ .../broker/admin/AdminApiSchemaTest.java | 32 +++++++++++++++++++ 5 files changed, 44 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java index ce4bb2a49688c..1f1635e9347ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/AvroSchemaBasedCompatibilityCheck.java @@ -64,8 +64,10 @@ public void checkCompatible(Iterable from, SchemaData to, SchemaComp log.warn("Error during schema parsing: {}", e.getMessage()); throw new IncompatibleSchemaException(e); } catch (SchemaValidationException e) { - log.warn("Error during schema compatibility check: {}", e.getMessage()); - throw new IncompatibleSchemaException(e); + String msg = String.format("Error during schema compatibility check with strategy %s: %s: %s", + strategy, e.getClass().getName(), e.getMessage()); + log.warn(msg); + throw new IncompatibleSchemaException(msg, e); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java index 585453705659a..da3dd8138fc61 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ProtobufNativeSchemaCompatibilityCheck.java @@ -67,7 +67,9 @@ public void checkCompatible(Iterable from, SchemaData to, SchemaComp private void checkRootMessageChange(Descriptor fromDescriptor, Descriptor toDescriptor, SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException { if (!fromDescriptor.getFullName().equals(toDescriptor.getFullName())) { - throw new IncompatibleSchemaException("Protobuf root message isn't allow change!"); + throw new IncompatibleSchemaException("Protobuf root message change is not allowed under the '" + + strategy + "' strategy. Original message name: '" + fromDescriptor.getFullName() + + "', new message name: '" + toDescriptor.getFullName() + "'."); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index 9cfcce419fd67..5f1c67e7bdcfa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -394,7 +394,7 @@ private CompletableFuture checkCompatibilityWithLatest(String schemaId, Sc } return result; } else { - return FutureUtils.exception(new IncompatibleSchemaException("Do not have existing schema.")); + return CompletableFuture.completedFuture(null); } }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java index 759cc08e3ff5d..a4731161b2940 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/exceptions/IncompatibleSchemaException.java @@ -33,6 +33,10 @@ public IncompatibleSchemaException(String message) { super(message); } + public IncompatibleSchemaException(String message, Throwable e) { + super(message, e); + } + public IncompatibleSchemaException(Throwable e) { super(e); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java index 6d5f0e5b48913..4d545d141ebd7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java @@ -49,6 +49,8 @@ import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.protocol.schema.IsCompatibilityResponse; +import org.apache.pulsar.common.protocol.schema.PostSchemaPayload; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaInfoWithVersion; import org.awaitility.Awaitility; @@ -415,4 +417,34 @@ public void testGetSchemaCompatibilityStrategyWhenSetBrokerLevelAndSchemaAutoUpd admin.namespaces().getSchemaCompatibilityStrategy(schemaCompatibilityNamespace), SchemaCompatibilityStrategy.UNDEFINED)); } + + @Test + public void testCompatibilityWithEmpty() throws Exception { + List> checkSchemas = List.of( + Schema.STRING, + Schema.JSON(SchemaDefinition.builder().withPojo(Foo.class).withProperties(PROPS).build()), + Schema.AVRO(SchemaDefinition.builder().withPojo(Foo.class).withProperties(PROPS).build()), + Schema.KeyValue(Schema.STRING, Schema.STRING) + ); + for (Schema schema : checkSchemas) { + SchemaInfo schemaInfo = schema.getSchemaInfo(); + String topicName = schemaCompatibilityNamespace + "/testCompatibilityWithEmpty"; + PostSchemaPayload postSchemaPayload = new PostSchemaPayload(schemaInfo.getType().toString(), + schemaInfo.getSchemaDefinition(), new HashMap<>()); + + // check compatibility with empty schema + IsCompatibilityResponse isCompatibilityResponse = + admin.schemas().testCompatibility(topicName, postSchemaPayload); + assertTrue(isCompatibilityResponse.isCompatibility()); + assertEquals(isCompatibilityResponse.getSchemaCompatibilityStrategy(), SchemaCompatibilityStrategy.FULL.name()); + + // set schema compatibility strategy is FULL_TRANSITIVE to cover checkCompatibilityWithAll + admin.namespaces().setSchemaCompatibilityStrategy(schemaCompatibilityNamespace, SchemaCompatibilityStrategy.FULL_TRANSITIVE); + isCompatibilityResponse = admin.schemas().testCompatibility(topicName, postSchemaPayload); + assertTrue(isCompatibilityResponse.isCompatibility()); + assertEquals(isCompatibilityResponse.getSchemaCompatibilityStrategy(), SchemaCompatibilityStrategy.FULL_TRANSITIVE.name()); + // set back to FULL + admin.namespaces().setSchemaCompatibilityStrategy(schemaCompatibilityNamespace, SchemaCompatibilityStrategy.FULL); + } + } }