diff --git a/.github/workflows/streaming-tests.yml b/.github/workflows/streaming-tests.yml index 89595fdd73..1fe99e0af7 100644 --- a/.github/workflows/streaming-tests.yml +++ b/.github/workflows/streaming-tests.yml @@ -51,7 +51,7 @@ jobs: brew install mysql PATH=$PATH:/usr/local/mysql/bin - name: Work around Homebrew MySQL being broken - # See https://github.com/Homebrew/homebrew-core/issues/130258 for more details. + # See https://github.com/Homebrew/homebrew-core/issues/130258 for more details. if: startsWith(matrix.os, 'macOS') run: | brew install zlib diff --git a/.github/workflows/test-python-universal-spark.yml b/.github/workflows/test-python-universal-spark.yml index 3b27cfc36e..d2ae316741 100644 --- a/.github/workflows/test-python-universal-spark.yml +++ b/.github/workflows/test-python-universal-spark.yml @@ -1,42 +1,112 @@ name: test-python-universal-spark -on: - repository_dispatch: - branches: [ "confluent_avro_changes", "master" ] - workflow_dispatch: - branches: [ "confluent_avro_changes", "master" ] +on: [pull_request, workflow_dispatch]: +# concurrency is currently broken, see details https://github.com/actions/runner/issues/1532 +#concurrency: +# group: pr-integration-tests-${{ github.event.pull_request.number }} +# cancel-in-progress: true jobs: - build: - + build-docker-image: + # all jobs MUST have this if check for 'ok-to-test' or 'approved' for security purposes. + if: + ((github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'lgtm' || github.event.label.name == 'ok-to-test')) || + (github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved') || contains(github.event.pull_request.labels.*.name, 'lgtm')))) && + github.repository == 'feast-dev/feast' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + # pull_request_target runs the workflow in the context of the base repo + # as such actions/checkout needs to be explicit configured to retrieve + # code from the PR. + ref: refs/pull/${{ github.event.pull_request.number }}/merge + submodules: recursive + - name: Set up QEMU + uses: docker/setup-qemu-action@v1 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + with: + install: true + - name: Set up AWS SDK + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: us-west-2 + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 + - name: Set ECR image tag + id: image-tag + run: echo "::set-output name=DOCKER_IMAGE_TAG::`git rev-parse HEAD`" + - name: Cache Public ECR Image + id: lambda_python_3_9 + uses: actions/cache@v2 + with: + path: ~/cache + key: lambda_python_3_9 + - name: Handle Cache Miss (pull public ECR image & save it to tar file) + if: steps.cache-primes.outputs.cache-hit != 'true' + run: | + mkdir -p ~/cache + docker pull public.ecr.aws/lambda/python:3.9 + docker save public.ecr.aws/lambda/python:3.9 -o ~/cache/lambda_python_3_9.tar + - name: Handle Cache Hit (load docker image from tar file) + if: steps.cache-primes.outputs.cache-hit == 'true' + run: | + docker load -i ~/cache/lambda_python_3_9.tar + - name: Build and push + env: + ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} + ECR_REPOSITORY: feast-python-server + run: | + docker build \ + --file sdk/python/feast/infra/feature_servers/aws_lambda/Dockerfile \ + --tag $ECR_REGISTRY/$ECR_REPOSITORY:${{ steps.image-tag.outputs.DOCKER_IMAGE_TAG }} \ + --load \ + . + docker push $ECR_REGISTRY/$ECR_REPOSITORY:${{ steps.image-tag.outputs.DOCKER_IMAGE_TAG }} + outputs: + DOCKER_IMAGE_TAG: ${{ steps.image-tag.outputs.DOCKER_IMAGE_TAG }} + spark-test-python: + # all jobs MUST have this if check for 'ok-to-test' or 'approved' for security purposes. + if: + ((github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'lgtm' || github.event.label.name == 'ok-to-test')) || + (github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved') || contains(github.event.pull_request.labels.*.name, 'lgtm')))) && + github.repository == 'feast-dev/feast' + needs: build-docker-image runs-on: ${{ matrix.os }} strategy: fail-fast: false matrix: - python-version: ["3.8"] - os: [ubuntu-latest, macOS-latest] - exclude: - - os: macOS-latest - python-version: "3.8" + python-version: [ "3.9" ] + os: [ ubuntu-latest ] env: OS: ${{ matrix.os }} PYTHON: ${{ matrix.python-version }} + services: + redis: + image: redis + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 steps: - - name: Increase swapfile - # Increase ubuntu's swapfile to avoid running out of resources which causes the action to terminate - if: startsWith(matrix.os, 'ubuntu') - run: | - sudo swapoff -a - sudo fallocate -l 8G /swapfile - sudo chmod 600 /swapfile - sudo mkswap /swapfile - sudo swapon /swapfile - sudo swapon --show - uses: actions/checkout@v2 + with: + # pull_request_target runs the workflow in the context of the base repo + # as such actions/checkout needs to be explicit configured to retrieve + # code from the PR. + ref: refs/pull/${{ github.event.pull_request.number }}/merge + submodules: recursive - name: Setup Python - id: setup-python uses: actions/setup-python@v2 + id: setup-python with: python-version: ${{ matrix.python-version }} architecture: x64 @@ -45,17 +115,24 @@ jobs: uses: actions/setup-go@v2 with: go-version: 1.19.7 - - name: Install mysql on macOS - if: startsWith(matrix.os, 'macOS') - run: | - brew install mysql - PATH=$PATH:/usr/local/mysql/bin - - name: Work around Homebrew MySQL being broken - # See https://github.com/Homebrew/homebrew-core/issues/130258 for more details. - if: startsWith(matrix.os, 'macOS') - run: | - brew install zlib - ln -sv $(brew --prefix zlib)/lib/libz.dylib $(brew --prefix)/lib/libzlib.dylib + - name: Authenticate to Google Cloud + uses: 'google-github-actions/auth@v1' + with: + credentials_json: '${{ secrets.GCP_SA_KEY }}' + - name: Set up gcloud SDK + uses: google-github-actions/setup-gcloud@v1 + with: + project_id: ${{ secrets.GCP_PROJECT_ID }} + - name: Use gcloud CLI + run: gcloud info + - name: Set up AWS SDK + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: us-west-2 + - name: Use AWS CLI + run: aws sts get-caller-identity - name: Upgrade pip version run: | pip install --upgrade pip @@ -74,8 +151,7 @@ jobs: restore-keys: | ${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-pip- - name: Install pip-tools - run: | - pip install -U pip-tools + run: pip install pip-tools - name: Install apache-arrow on ubuntu if: matrix.os == 'ubuntu-latest' run: | @@ -92,8 +168,17 @@ jobs: brew install pkg-config - name: Install dependencies run: make install-python-ci-dependencies - - name: Test Spark + - name: Setup Redis Cluster run: | - make test-python-universal-spark - - + docker pull vishnunair/docker-redis-cluster:latest + docker run -d -p 6001:6379 -p 6002:6380 -p 6003:6381 -p 6004:6382 -p 6005:6383 -p 6006:6384 --name redis-cluster vishnunair/docker-redis-cluster + - name: Test python + if: ${{ always() }} # this will guarantee that step won't be canceled and resources won't leak + env: + FEAST_SERVER_DOCKER_IMAGE_TAG: ${{ needs.build-docker-image.outputs.DOCKER_IMAGE_TAG }} + SNOWFLAKE_CI_DEPLOYMENT: ${{ secrets.SNOWFLAKE_CI_DEPLOYMENT }} + SNOWFLAKE_CI_USER: ${{ secrets.SNOWFLAKE_CI_USER }} + SNOWFLAKE_CI_PASSWORD: ${{ secrets.SNOWFLAKE_CI_PASSWORD }} + SNOWFLAKE_CI_ROLE: ${{ secrets.SNOWFLAKE_CI_ROLE }} + SNOWFLAKE_CI_WAREHOUSE: ${{ secrets.SNOWFLAKE_CI_WAREHOUSE }} + run: make test-python-universal-spark \ No newline at end of file diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index ac2f513229..755567fa8a 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -1,6 +1,6 @@ name: unit-tests -on: [pull_request] +on: [pull_request, workflow_dispatch] jobs: unit-test-python: runs-on: ${{ matrix.os }} diff --git a/protos/feast/core/DataFormat.proto b/protos/feast/core/DataFormat.proto index c453e5e4c8..74a1aa4cea 100644 --- a/protos/feast/core/DataFormat.proto +++ b/protos/feast/core/DataFormat.proto @@ -48,6 +48,13 @@ message StreamFormat { string schema_json = 1; } + // Defines options for the avro data format + message ConfluentAvroFormat { + // Optional if used in a File DataSource as schema is embedded in avro file. + // Specifies the schema of the Avro message as JSON string. + string schema_json = 1; + } + message JsonFormat { string schema_json = 1; } @@ -57,5 +64,6 @@ message StreamFormat { AvroFormat avro_format = 1; ProtoFormat proto_format = 2; JsonFormat json_format = 3; + ConfluentAvroFormat confluent_avro_format = 4; } } diff --git a/sdk/python/feast/data_format.py b/sdk/python/feast/data_format.py index 8f3b195e3e..f61beadb64 100644 --- a/sdk/python/feast/data_format.py +++ b/sdk/python/feast/data_format.py @@ -89,6 +89,8 @@ def from_proto(cls, proto): fmt = proto.WhichOneof("format") if fmt == "avro_format": return AvroFormat(schema_json=proto.avro_format.schema_json) + if fmt == "confluent_avro_format": + return ConfluentAvroFormat(schema_json=proto.confluent_avro_format.schema_json) if fmt == "json_format": return JsonFormat(schema_json=proto.json_format.schema_json) if fmt == "proto_format": @@ -115,6 +117,25 @@ def to_proto(self): return StreamFormatProto(avro_format=proto) +class ConfluentAvroFormat(StreamFormat): + """ + Defines the ConfluentAvro streaming data format that encodes data in ConfluentAvro format + """ + + def __init__(self, schema_json: str): + """ + Construct a new ConfluentAvro data format. + + Args: + schema_json: ConfluentAvro schema definition in JSON + """ + self.schema_json = schema_json + + def to_proto(self): + proto = StreamFormatProto.ConfluentAvroFormat(schema_json=self.schema_json) + return StreamFormatProto(confluent_avro_format=proto) + + class JsonFormat(StreamFormat): """ Defines the Json streaming data format that encodes data in Json format diff --git a/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py b/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py index ba17545593..9af87c07ac 100644 --- a/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py +++ b/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py @@ -17,6 +17,7 @@ from feast.expediagroup.pydantic_models.stream_format_model import ( AnyStreamFormat, AvroFormatModel, + ConfluentAvroFormatModel, JsonFormatModel, ProtoFormatModel, ) @@ -230,7 +231,7 @@ def from_data_source( ) -SUPPORTED_MESSAGE_FORMATS = [AvroFormatModel, JsonFormatModel, ProtoFormatModel] +SUPPORTED_MESSAGE_FORMATS = [AvroFormatModel, ConfluentAvroFormatModel, JsonFormatModel, ProtoFormatModel] SUPPORTED_KAFKA_BATCH_SOURCES = [RequestSourceModel, SparkSourceModel] diff --git a/sdk/python/feast/expediagroup/pydantic_models/stream_format_model.py b/sdk/python/feast/expediagroup/pydantic_models/stream_format_model.py index 53987723d2..bbea559f4a 100644 --- a/sdk/python/feast/expediagroup/pydantic_models/stream_format_model.py +++ b/sdk/python/feast/expediagroup/pydantic_models/stream_format_model.py @@ -4,7 +4,7 @@ from pydantic import Field as PydanticField from typing_extensions import Annotated, Self -from feast.data_format import AvroFormat, JsonFormat, ProtoFormat +from feast.data_format import AvroFormat, ConfluentAvroFormat, JsonFormat, ProtoFormat class StreamFormatModel(BaseModel): @@ -63,6 +63,37 @@ def from_stream_format( return cls(schoma=avro_format.schema_json) +class ConfluentAvroFormatModel(StreamFormatModel): + """ + Pydantic Model of a Feast ConfluentAvroFormat. + """ + + format: Literal["ConfluentAvroFormatModel"] = "ConfluentAvroFormatModel" + schoma: str + + def to_stream_format(self) -> ConfluentAvroFormat: + """ + Given a Pydantic ConfluentAvroFormatModel, create and return a ConfluentAvroFormat. + + Returns: + A ConfluentAvroFormat. + """ + return ConfluentAvroFormat(schema_json=self.schoma) + + @classmethod + def from_stream_format( + cls, + confluent_avro_format, + ) -> Self: # type: ignore + """ + Converts a ConfluentAvroFormat object to its pydantic model representation. + + Returns: + A ConfluentAvroFormatModel. + """ + return cls(schoma=confluent_avro_format.schema_json) + + class JsonFormatModel(StreamFormatModel): """ Pydantic Model of a Feast JsonFormat. @@ -128,6 +159,6 @@ def from_stream_format( # https://blog.devgenius.io/deserialize-child-classes-with-pydantic-that-gonna-work-784230e1cf83 # This lets us discriminate child classes of DataSourceModel with type hints. AnyStreamFormat = Annotated[ - Union[AvroFormatModel, JsonFormatModel, ProtoFormatModel], + Union[ConfluentAvroFormatModel, AvroFormatModel, JsonFormatModel, ProtoFormatModel], PydanticField(discriminator="format"), ] diff --git a/sdk/python/feast/expediagroup/schema_registry/schema_registry.py b/sdk/python/feast/expediagroup/schema_registry/schema_registry.py new file mode 100644 index 0000000000..52bfe70174 --- /dev/null +++ b/sdk/python/feast/expediagroup/schema_registry/schema_registry.py @@ -0,0 +1,172 @@ +""" +Wrapper for SchemaRegistryClient, to separate Feast from +the extensive auth and configuration process of +connecting to a SchemaRegistry. + +Copyright 2024 Expedia Group +Author: matcarlin@expediagroup.com +""" + +import json +import os +import tempfile +from typing import Dict + +import requests +from confluent_kafka.schema_registry import RegisteredSchema, SchemaRegistryClient + + +class SchemaRegistry(): + props: Dict[str, str] + kafka_params: Dict[str, str] + schema_registry_config: Dict[str, str] + client: SchemaRegistryClient + + def __init__(self): + pass + + def initialize_client( + self, + user: str, + password: str, + urn: str, + environment: str, + cert_path: str, # https://stackoverflow.com/questions/55203791/python-requests-using-certificate-value-instead-of-path + ) -> None: + """ + Discover a Schema Registry with the provided urn and credentials, + obtain a set of properties for use in Schema Registry calls, + and initialize the SchemaRegistryClient. + """ + + discovery_url = "https://stream-discovery-service-{environment}.rcp.us-east-1.data.{environment}.exp-aws.net/v2/discovery/urn/{urn}".format( + environment=environment, urn=urn + ) + + response = requests.get( + discovery_url, + auth=(user, password), + headers={"Accept": "application/json"}, + verify=cert_path, + ) + + if response.status_code != 200: + raise RuntimeError( + "Discovery API returned unexpected HTTP status: {status}".format( + status=str(response.status_code) + ) + ) + + try: + props = json.loads(response.text) + except (TypeError, UnicodeDecodeError): + raise TypeError( + "Discovery API response did not contain valid json: {response}".format( + response=response.text + ) + ) + + self.props = props + + # write ssl key and cert to disk + ssl_key_file, ssl_key_path = tempfile.mkstemp() + with os.fdopen(ssl_key_file, "w") as f: + f.write(props["serde"]["schema.registry.ssl.keystore.key"]) + + ssl_certificate_file, ssl_certificate_path = tempfile.mkstemp() + with os.fdopen(ssl_certificate_file, "w") as f: + f.write(props["serde"]["schema.registry.ssl.keystore.certificate.chain"]) + + self.kafka_params = { + "kafka.security.protocol": props["security"]["security.protocol"], + "kafka.bootstrap.servers": props["connection"]["bootstrap.servers"], + "subscribe": props["connection"]["topic"], + "startingOffsets": props["connection"]["auto.offset.reset"], + "kafka.ssl.truststore.certificates": props["security"][ + "ssl.truststore.certificates" + ], + "kafka.ssl.keystore.certificate.chain": props["security"][ + "ssl.keystore.certificate.chain" + ], + "kafka.ssl.keystore.key": props["security"]["ssl.keystore.key"], + "kafka.ssl.endpoint.identification.algorithm": props["security"][ + "ssl.endpoint.identification.algorithm" + ], + "kafka.ssl.truststore.type": props["security"]["ssl.truststore.type"], + "kafka.ssl.keystore.type": props["security"]["ssl.keystore.type"], + "kafka.topic": props["connection"]["topic"], + "kafka.schema.registry.url": props["serde"]["schema.registry.url"], + "kafka.schema.registry.topic": props["connection"]["topic"], + "kafka.schema.registry.ssl.keystore.type": props["serde"][ + "schema.registry.ssl.keystore.type" + ], + "kafka.schema.registry.ssl.keystore.certificate.chain": props["serde"][ + "schema.registry.ssl.keystore.certificate.chain" + ], + "kafka.schema.registry.ssl.keystore.key": props["serde"][ + "schema.registry.ssl.keystore.key" + ], + "kafka.schema.registry.ssl.truststore.certificates": props["serde"][ + "schema.registry.ssl.truststore.certificates" + ], + "kafka.schema.registry.ssl.truststore.type": props["serde"][ + "schema.registry.ssl.truststore.type" + ], + "value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy", + } + + self.schema_registry_config = { + "schema.registry.topic": props["connection"]["topic"], + "schema.registry.url": props["serde"]["schema.registry.url"], + "schema.registry.ssl.keystore.type": props["serde"][ + "schema.registry.ssl.keystore.type" + ], + "schema.registry.ssl.keystore.certificate.chain": props["serde"][ + "schema.registry.ssl.keystore.certificate.chain" + ], + "schema.registry.ssl.keystore.key": props["serde"][ + "schema.registry.ssl.keystore.key" + ], + "schema.registry.ssl.truststore.certificates": props["serde"][ + "schema.registry.ssl.truststore.certificates" + ], + "schema.registry.ssl.truststore.type": props["serde"][ + "schema.registry.ssl.truststore.type" + ], + } + + schema_registry_url = props["serde"]["schema.registry.url"] + + self.client = SchemaRegistryClient( + { + "url": schema_registry_url, + "ssl.ca.location": cert_path, + "ssl.key.location": ssl_key_path, + "ssl.certificate.location": ssl_certificate_path, + } + ) + + def get_latest_version( + self, + topic_name: str, + ) -> RegisteredSchema: + """ + Get the latest version of the topic. + """ + if not self.client: + raise RuntimeError("Client has not been initialized. Please call initialize_client first.") + + latest_version = self.client.get_latest_version(topic_name) + + return latest_version + + def get_client( + self + ) -> SchemaRegistryClient: + """ + Return the client. + """ + if not self.client: + raise RuntimeError("Client has not been initialized. Please call initialize_client first.") + + return self.client diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index ea55d89988..04901d64e4 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -2,13 +2,16 @@ from typing import List, Optional import pandas as pd +from confluent_kafka.schema_registry.avro import AvroDeserializer from pyspark.sql import DataFrame, SparkSession from pyspark.sql.avro.functions import from_avro from pyspark.sql.functions import col, from_json -from feast.data_format import AvroFormat, JsonFormat +from feast.data_format import AvroFormat, ConfluentAvroFormat, JsonFormat from feast.data_source import KafkaSource, PushMode +from feast.expediagroup.schema_registry.schema_registry import SchemaRegistry from feast.feature_store import FeatureStore +from feast.feature_view import FeatureView from feast.infra.contrib.stream_processor import ( ProcessorConfig, StreamProcessor, @@ -21,6 +24,7 @@ class SparkProcessorConfig(ProcessorConfig): spark_session: SparkSession processing_time: str query_timeout: int + schema_registry_client: Optional[SchemaRegistry] class SparkKafkaProcessor(StreamProcessor): @@ -33,26 +37,34 @@ def __init__( self, *, fs: FeatureStore, - sfv: StreamFeatureView, + sfv: FeatureView, config: ProcessorConfig, preprocess_fn: Optional[MethodType] = None, ): + + # In general, FeatureView may or may not have stream_source, but it must + # have one to use spark kafka processor + if not sfv.stream_source: + raise ValueError("Feature View must have a stream source to use spark streaming.") + if not isinstance(sfv.stream_source, KafkaSource): raise ValueError("data source is not kafka source") if not isinstance( sfv.stream_source.kafka_options.message_format, AvroFormat + ) and not isinstance( + sfv.stream_source.kafka_options.message_format, ConfluentAvroFormat ) and not isinstance( sfv.stream_source.kafka_options.message_format, JsonFormat ): raise ValueError( - "spark streaming currently only supports json or avro format for kafka source schema" + "spark streaming currently only supports json, avro and confluent avro formats for kafka source schema" ) - self.format = ( - "json" - if isinstance(sfv.stream_source.kafka_options.message_format, JsonFormat) - else "avro" - ) + self.format = "avro" + if isinstance(sfv.stream_source.kafka_options.message_format, JsonFormat): + self.format = "json" + elif isinstance(sfv.stream_source.kafka_options.message_format, ConfluentAvroFormat): + self.format = "confluent_avro" if not isinstance(config, SparkProcessorConfig): raise ValueError("config is not spark processor config") @@ -60,9 +72,18 @@ def __init__( self.preprocess_fn = preprocess_fn self.processing_time = config.processing_time self.query_timeout = config.query_timeout + self.schema_registry_client = config.schema_registry_client if config.schema_registry_client else None self.join_keys = [fs.get_entity(entity).join_key for entity in sfv.entities] + + if isinstance(sfv.stream_source.kafka_options.message_format, ConfluentAvroFormat): + self.init_confluent_avro_processor() + super().__init__(fs=fs, sfv=sfv, data_source=sfv.stream_source) + def init_confluent_avro_processor(self) -> None: + """Extra initialization for Confluent Avro processor.""" + self.deserializer = AvroDeserializer(schema_registry_client=self.schema_registry_client.get_client()) + def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None: ingested_stream_df = self._ingest_stream_data() transformed_df = self._construct_transformation_plan(ingested_stream_df) @@ -70,7 +91,8 @@ def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None: return online_store_query def _ingest_stream_data(self) -> StreamTable: - """Only supports json and avro formats currently.""" + """Only supports json, avro and confluent_avro formats currently.""" + # Test that we reach this path, and stop. if self.format == "json": if not isinstance( self.data_source.kafka_options.message_format, JsonFormat @@ -94,6 +116,21 @@ def _ingest_stream_data(self) -> StreamTable: ) .select("table.*") ) + elif self.format == "confluent_avro": + if not isinstance( + self.data_source.kafka_options.message_format, ConfluentAvroFormat + ): + raise ValueError("kafka source message format is not confluent_avro format") + + stream_df = ( + self.spark.readStream.format("kafka") + .options(**self.kafka_options_config) + .load() + .select( + self.deserializer(col("value")) + ) + .select("table.*") + ) else: if not isinstance( self.data_source.kafka_options.message_format, AvroFormat diff --git a/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile b/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile index c95c515fb4..fdd8e3ac51 100644 --- a/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile +++ b/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile @@ -4,7 +4,6 @@ RUN apt update && \ apt install -y \ jq \ python3-dev \ - default-libmysqlclient-dev \ build-essential RUN pip install pip --upgrade diff --git a/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile.dev b/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile.dev index ecbc199a5b..3fc1355d7a 100644 --- a/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile.dev +++ b/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile.dev @@ -4,7 +4,6 @@ RUN apt update && \ apt install -y \ jq \ python3-dev \ - default-libmysqlclient-dev \ build-essential RUN pip install pip --upgrade diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index 223e76b2e6..9f79bc70d0 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -154,6 +154,8 @@ comm==0.1.3 # via ipykernel coverage[toml]==7.2.7 # via pytest-cov +confluent_kafka==2.0.2 + # via eg-feast (setup.py) cryptography==40.0.2 # via # adal @@ -534,8 +536,6 @@ mypy-extensions==1.0.0 # mypy mypy-protobuf==3.1.0 # via eg-feast (setup.py) -mysqlclient==2.2.0 - # via eg-feast (setup.py) nbclient==0.8.0 # via nbconvert nbconvert==7.7.3 diff --git a/sdk/python/requirements/py3.8-ci-requirements.txt b/sdk/python/requirements/py3.8-ci-requirements.txt index 0a1e7d74de..feed0d5ca3 100644 --- a/sdk/python/requirements/py3.8-ci-requirements.txt +++ b/sdk/python/requirements/py3.8-ci-requirements.txt @@ -156,6 +156,8 @@ comm==0.1.3 # via ipykernel coverage[toml]==7.2.7 # via pytest-cov +confluent_kafka==2.0.2 + # via eg-feast (setup.py) cryptography==40.0.2 # via # adal @@ -529,8 +531,6 @@ mypy-extensions==1.0.0 # mypy mypy-protobuf==3.1 # via feast (setup.py) -mysqlclient==2.1.1 - # via feast (setup.py) nbclassic==1.0.0 # via notebook nbclient==0.8.0 diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index 31eb4496c6..c5e0492fd7 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -154,6 +154,8 @@ comm==0.1.3 # via ipykernel coverage[toml]==7.2.7 # via pytest-cov +confluent_kafka==2.0.2 + # via eg-feast (setup.py) cryptography==40.0.2 # via # adal @@ -540,8 +542,6 @@ mypy-extensions==1.0.0 # mypy mypy-protobuf==3.1.0 # via eg-feast (setup.py) -mysqlclient==2.2.0 - # via eg-feast (setup.py) nbclient==0.8.0 # via nbconvert nbconvert==7.7.3 diff --git a/sdk/python/tests/unit/infra/test_streaming_ingestion.py b/sdk/python/tests/unit/infra/test_streaming_ingestion.py new file mode 100644 index 0000000000..74a39cefe0 --- /dev/null +++ b/sdk/python/tests/unit/infra/test_streaming_ingestion.py @@ -0,0 +1,76 @@ +# Copyright 2020 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import assertpy +import pytest + +from datetime import timedelta + +from feast.entity import Entity +from feast.value_type import ValueType +from feast.infra.contrib.spark_kafka_processor import SparkKafkaProcessor +from tests.data.data_creator import create_basic_driver_dataset +from feast.infra.offline_stores.contrib.spark_offline_store.tests.data_source import ( + SparkDataSourceCreator, +) +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.integration.feature_repos.repo_configuration import ( + construct_test_environment, +) +from tests.integration.feature_repos.universal.online_store.redis import ( + RedisOnlineStoreCreator, +) +from feast.data_source import KafkaSource +from feast.data_format import AvroFormat, ConfluentAvroFormat +from feast import FileSource +from feast.stream_feature_view import StreamFeatureView + + +def test_streaming_ingestion(): + + spark_config = IntegrationTestRepoConfig( + provider="local", + online_store_creator=RedisOnlineStoreCreator, + offline_store_creator=SparkDataSourceCreator, + batch_engine={"type": "spark.engine", "partitions": 10}, + ) + spark_environment = construct_test_environment( + spark_config, None, entity_key_serialization_version=1 + ) + + df = create_basic_driver_dataset() + + # Make a stream source. + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=ConfluentAvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + StreamFeatureView( + name="test kafka stream feature view", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + aggregations=[], + ) + + + + # processor = SparkKafkaProcessor() +# + diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index f421751300..1ed8b7fb64 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -5,7 +5,7 @@ from feast.aggregation import Aggregation from feast.batch_feature_view import BatchFeatureView -from feast.data_format import AvroFormat +from feast.data_format import AvroFormat, ConfluentAvroFormat from feast.data_source import KafkaSource, PushSource from feast.entity import Entity from feast.feature_view import FeatureView @@ -48,7 +48,7 @@ def test_create_batch_feature_view(): name="kafka", timestamp_field="event_timestamp", kafka_bootstrap_servers="", - message_format=AvroFormat(""), + message_format=ConfluentAvroFormat(""), topic="topic", batch_source=FileSource(path="some path"), ) diff --git a/sdk/python/tests/unit/test_sql_registry.py b/sdk/python/tests/unit/test_sql_registry.py index 5fba4013bd..3cf68b3e08 100644 --- a/sdk/python/tests/unit/test_sql_registry.py +++ b/sdk/python/tests/unit/test_sql_registry.py @@ -104,7 +104,7 @@ def mysql_registry(): registry_config = RegistryConfig( registry_type="sql", - path=f"mysql+mysqldb://{POSTGRES_USER}:{POSTGRES_PASSWORD}@127.0.0.1:{container_port}/{POSTGRES_DB}", + path=f"mysql+pymysql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@127.0.0.1:{container_port}/{POSTGRES_DB}", cache_ttl_seconds=60, ) diff --git a/setup.py b/setup.py index 3a0a2c4314..5ebf3adfed 100644 --- a/setup.py +++ b/setup.py @@ -64,7 +64,7 @@ "pyarrow>=4,<12", "pydantic>=1,<2", "pygments>=2.12.0,<3", - "PyYAML>=5.4.0,<7", + # "PyYAML>=5.4.0,<7", "requests", "SQLAlchemy[mypy]>1,<2", "tabulate>=0.8.0,<1", @@ -119,7 +119,7 @@ "psycopg2-binary>=2.8.3,<3", ] -MYSQL_REQUIRED = ["mysqlclient", "pymysql", "types-PyMySQL"] +MYSQL_REQUIRED = ["pymysql", "types-PyMySQL"] HBASE_REQUIRED = [ "happybase>=1.2.0,<3", @@ -157,6 +157,12 @@ "elasticsearch==8.8", ] +CONFLUENT_REQUIRED = [ + "pipenv", + "confluent_kafka>=2.0.2", + "pip-system-certs==4.0" +] + CI_REQUIRED = ( [ "build", @@ -195,7 +201,7 @@ "types-protobuf~=3.19.22", "types-python-dateutil", "types-pytz", - "types-PyYAML", + # "types-PyYAML", "types-redis", "types-requests", "types-setuptools", @@ -218,6 +224,7 @@ + HAZELCAST_REQUIRED + MILVUS_REQUIRED + ELASTICSEARCH_REQUIRED + + CONFLUENT_REQUIRED )