Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SSDP 192] Use the schema registry to derive a stream schema in the kafka source #7

Conversation

albanovito
Copy link

##What
Fetch the avro schema from schema registry for each topic and save the schema in the Airbyte Catalog.

##How
In the discovery method call a Schema Registry to fetch the correct schema message with an API Confluent, and after transform it in the Json format and save in the Airbyte Catalog.

Recommended reading order

  1. x.java
  2. y.python

🚨 User Impact 🚨

Are there any breaking changes? What is the end result perceived by the user?

For connector PRs, use this section to explain which type of semantic versioning bump occurs as a result of the changes. Refer to our Semantic Versioning for Connectors guidelines for more information. Breaking changes to connectors must be documented by an Airbyte engineer (PR author, or reviewer for community PRs) by using the Breaking Change Release Playbook.

If there are breaking changes, please merge this PR with the 🚨🚨 emoji so changelog authors can further highlight this if needed.

Pre-merge Actions

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Connector version is set to 0.0.1
    • Dockerfile has version 0.0.1
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/integrations/<source or destination>/<name>.md including changelog with an entry for the initial version. See changelog example
    • docs/integrations/README.md

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Unit & integration tests added

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • If new credentials are required for use in CI, add them to GSM. Instructions.
Connector Generator
  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates then checking in your changes
  • Documentation which references the generator is updated as needed

@albanovito albanovito changed the title [Ssdp 192] Use the schema registry to derive a stream schema in the kafka source [SSDP 192] Use the schema registry to derive a stream schema in the kafka source Jun 7, 2023
@albanovito albanovito marked this pull request as ready for review June 8, 2023 07:49
@albanovito albanovito requested a review from a team June 8, 2023 07:50
Copy link

@luisvicenteatprima luisvicenteatprima left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests are pretty comprehensive. But I have two questions;

  1. Have we tested using the schema registry in staging?
  2. Have we tested this with schema registry subjects that have references?

*/
private static final Map<String, List<String>> AVRO_TO_JSON_DATA_TYPE_MAPPING = Map.ofEntries(
entry("null", List.of("null")),
entry("boolean", List.of("boolean", "null")),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are they translated to nullable fields? F.i. A boolean field on avro would not accept null as a value unless it's type is [boolean, null] - afaik.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, we can map Avro and Json type one by one, if in Avro there are double options we can set double option in json too.

}

public JsonNode convertoToAirbyteJson(String avroSchema) throws Exception {
Map<String, Object> mapAvroSchema = mapper.readValue(avroSchema, new TypeReference<>() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Airbyte provices a helper class that we can use to serialise/deserialise Json - io.airbyte.commons.json.Jsons.

* @return Map<String, Object> map with Json struct
* @throws Exception
*/
public Map<String, Object> convertoToAirbyteJson(Map<String, Object> avroSchema) throws Exception {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a first version this is fine, but I do wonder if there is a library that provides a AvroSchema class that we could use instead of a Map.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing we can do is to extract the different paths to private methods, i.e. one to deal with lists, one with maps (records)

assertInstanceOf(AvroFormat.class, kafkaFormat);
}

// @Test

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove commented lines?


ObjectMapper mapper = new ObjectMapper();

String avroSimpleSchema = """

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should move all this string literals to resource files to keep the test class a bit tidier.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, it's better. I'll do

@albanovito
Copy link
Author

The tests are pretty comprehensive. But I have two questions;

  1. Have we tested using the schema registry in staging?
  2. Have we tested this with schema registry subjects that have references?
  1. I used the schema registry in It-Staging environment on Confluent.
  2. I used this topic with both message and schema saved in avro format

@luisvicenteatprima
Copy link

I don't have permissions for the italian account. But I thought we were going to get rid of that account because it was created just for evaluation purposes. If that schema doesn't have type references, we should do a test with references.

@albanovito
Copy link
Author

I don't have permissions for the italian account. But I thought we were going to get rid of that account because it was created just for evaluation purposes. If that schema doesn't have type references, we should do a test with references.

Ok, in Italy we have the permission to create topic and token. Can you give me the staging permissions so I can try them?

@albanovito albanovito force-pushed the SSDP-192/task/use-the-schema-registry-to-derive-a-stream-schema-in-the-kafka-source branch 2 times, most recently from ce55eac to 8c39acb Compare June 21, 2023 07:26
Implement a simply converto to avro in json schema and use it in the discovery method to getch the schema from Confluent Schema Registry
@albanovito albanovito force-pushed the SSDP-192/task/use-the-schema-registry-to-derive-a-stream-schema-in-the-kafka-source branch from 8c39acb to be7986f Compare June 22, 2023 14:49
@luisvicenteatprima luisvicenteatprima force-pushed the master branch 3 times, most recently from aae7401 to 069e9af Compare July 4, 2023 10:10
@luisvicenteatprima luisvicenteatprima force-pushed the master branch 2 times, most recently from f38d69f to ba32568 Compare July 17, 2023 13:10
@danielevergara danielevergara removed their request for review November 3, 2023 08:32
@albanovito albanovito closed this Jan 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants