Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Confluent avro changes #81

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5993c32
Add empty model and format hooks for ConfluentAvro format
expediamatt Jan 18, 2024
4f9fb06
More confluent avro changes
expediamatt Feb 4, 2024
c73a855
Merge branch 'master' of https://github.com/ExpediaGroup/feast into c…
expediamatt Feb 7, 2024
b2c2154
Empty commit
expediamatt Feb 7, 2024
8751083
Empty commit 2
expediamatt Feb 7, 2024
d86093d
Empty commit 3
expediamatt Feb 7, 2024
503a701
Empty commit 4
expediamatt Feb 7, 2024
ac014b5
Empty commit
expediamatt Feb 8, 2024
67cb991
Empty commit
expediamatt Feb 8, 2024
d96f8e2
Merge branch 'master' of https://github.com/ExpediaGroup/feast into c…
expediamatt Feb 8, 2024
f30e0d2
Update streaming tests GA
expediamatt Feb 8, 2024
c6ffa7d
Add workflow_dispatch as a trigger for unit_tests github action
expediamatt Feb 11, 2024
1b5d0c0
Add confluent packages to setup.py
expediamatt Feb 11, 2024
06f0108
Fix test-python-universal-spark to use docker ala the integration tests
expediamatt Feb 11, 2024
6673e7a
Fix test-python-universal-spark to use docker ala the integration tes…
expediamatt Feb 11, 2024
e3c7f42
Link spark_kafka_processor
expediamatt Feb 11, 2024
35a022c
Add SchemaRegistry wrapper
expediamatt Feb 11, 2024
a448480
Add AvroDeserializer to Spark Kafka Processor
expediamatt Feb 11, 2024
7ed2d37
Remove unnecessary dependency on mysqlclient - equal to feast PR 3925
expediamatt Feb 12, 2024
cc96aba
Merge branch 'master' of https://github.com/ExpediaGroup/feast into c…
expediamatt Feb 13, 2024
d2f0a07
Preliminary checkin for test_streaming_ingestion
expediamatt Feb 13, 2024
85aeb7d
Start adding FeatureView to spark kafka processor, and add a sketch o…
expediamatt Feb 13, 2024
84952da
Change dash to underscore in setup.py
expediamatt Feb 14, 2024
f125722
Add confluent kafka to requirements
expediamatt Feb 14, 2024
e7c8a0c
Import FeatureView in Spark Kafka Processor
expediamatt Feb 14, 2024
2fc310d
Take 57
expediamatt Feb 14, 2024
d0d86bf
Take 58
expediamatt Feb 14, 2024
32f53c3
Take 59
expediamatt Feb 14, 2024
35930f9
Take 60
expediamatt Feb 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/streaming-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
brew install mysql
PATH=$PATH:/usr/local/mysql/bin
- name: Work around Homebrew MySQL being broken
# See https://github.com/Homebrew/homebrew-core/issues/130258 for more details.
# See https://github.com/Homebrew/homebrew-core/issues/130258 for more details.
if: startsWith(matrix.os, 'macOS')
run: |
brew install zlib
Expand Down
165 changes: 125 additions & 40 deletions .github/workflows/test-python-universal-spark.yml
Original file line number Diff line number Diff line change
@@ -1,42 +1,112 @@
name: test-python-universal-spark

on:
repository_dispatch:
branches: [ "confluent_avro_changes", "master" ]
workflow_dispatch:
branches: [ "confluent_avro_changes", "master" ]
on: [pull_request, workflow_dispatch]:

# concurrency is currently broken, see details https://github.com/actions/runner/issues/1532
#concurrency:
# group: pr-integration-tests-${{ github.event.pull_request.number }}
# cancel-in-progress: true

jobs:
build:

build-docker-image:
# all jobs MUST have this if check for 'ok-to-test' or 'approved' for security purposes.
if:
((github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'lgtm' || github.event.label.name == 'ok-to-test')) ||
(github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved') || contains(github.event.pull_request.labels.*.name, 'lgtm')))) &&
github.repository == 'feast-dev/feast'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
# pull_request_target runs the workflow in the context of the base repo
# as such actions/checkout needs to be explicit configured to retrieve
# code from the PR.
ref: refs/pull/${{ github.event.pull_request.number }}/merge
submodules: recursive
- name: Set up QEMU
uses: docker/setup-qemu-action@v1
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
with:
install: true
- name: Set up AWS SDK
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-west-2
- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1
- name: Set ECR image tag
id: image-tag
run: echo "::set-output name=DOCKER_IMAGE_TAG::`git rev-parse HEAD`"
- name: Cache Public ECR Image
id: lambda_python_3_9
uses: actions/cache@v2
with:
path: ~/cache
key: lambda_python_3_9
- name: Handle Cache Miss (pull public ECR image & save it to tar file)
if: steps.cache-primes.outputs.cache-hit != 'true'
run: |
mkdir -p ~/cache
docker pull public.ecr.aws/lambda/python:3.9
docker save public.ecr.aws/lambda/python:3.9 -o ~/cache/lambda_python_3_9.tar
- name: Handle Cache Hit (load docker image from tar file)
if: steps.cache-primes.outputs.cache-hit == 'true'
run: |
docker load -i ~/cache/lambda_python_3_9.tar
- name: Build and push
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
ECR_REPOSITORY: feast-python-server
run: |
docker build \
--file sdk/python/feast/infra/feature_servers/aws_lambda/Dockerfile \
--tag $ECR_REGISTRY/$ECR_REPOSITORY:${{ steps.image-tag.outputs.DOCKER_IMAGE_TAG }} \
--load \
.
docker push $ECR_REGISTRY/$ECR_REPOSITORY:${{ steps.image-tag.outputs.DOCKER_IMAGE_TAG }}
outputs:
DOCKER_IMAGE_TAG: ${{ steps.image-tag.outputs.DOCKER_IMAGE_TAG }}
spark-test-python:
# all jobs MUST have this if check for 'ok-to-test' or 'approved' for security purposes.
if:
((github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'lgtm' || github.event.label.name == 'ok-to-test')) ||
(github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved') || contains(github.event.pull_request.labels.*.name, 'lgtm')))) &&
github.repository == 'feast-dev/feast'
needs: build-docker-image
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
python-version: ["3.8"]
os: [ubuntu-latest, macOS-latest]
exclude:
- os: macOS-latest
python-version: "3.8"
python-version: [ "3.9" ]
os: [ ubuntu-latest ]
env:
OS: ${{ matrix.os }}
PYTHON: ${{ matrix.python-version }}
services:
redis:
image: redis
ports:
- 6379:6379
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- name: Increase swapfile
# Increase ubuntu's swapfile to avoid running out of resources which causes the action to terminate
if: startsWith(matrix.os, 'ubuntu')
run: |
sudo swapoff -a
sudo fallocate -l 8G /swapfile
sudo chmod 600 /swapfile
sudo mkswap /swapfile
sudo swapon /swapfile
sudo swapon --show
- uses: actions/checkout@v2
with:
# pull_request_target runs the workflow in the context of the base repo
# as such actions/checkout needs to be explicit configured to retrieve
# code from the PR.
ref: refs/pull/${{ github.event.pull_request.number }}/merge
submodules: recursive
- name: Setup Python
id: setup-python
uses: actions/setup-python@v2
id: setup-python
with:
python-version: ${{ matrix.python-version }}
architecture: x64
Expand All @@ -45,17 +115,24 @@ jobs:
uses: actions/setup-go@v2
with:
go-version: 1.19.7
- name: Install mysql on macOS
if: startsWith(matrix.os, 'macOS')
run: |
brew install mysql
PATH=$PATH:/usr/local/mysql/bin
- name: Work around Homebrew MySQL being broken
# See https://github.com/Homebrew/homebrew-core/issues/130258 for more details.
if: startsWith(matrix.os, 'macOS')
run: |
brew install zlib
ln -sv $(brew --prefix zlib)/lib/libz.dylib $(brew --prefix)/lib/libzlib.dylib
- name: Authenticate to Google Cloud
uses: 'google-github-actions/auth@v1'
with:
credentials_json: '${{ secrets.GCP_SA_KEY }}'
- name: Set up gcloud SDK
uses: google-github-actions/setup-gcloud@v1
with:
project_id: ${{ secrets.GCP_PROJECT_ID }}
- name: Use gcloud CLI
run: gcloud info
- name: Set up AWS SDK
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-west-2
- name: Use AWS CLI
run: aws sts get-caller-identity
- name: Upgrade pip version
run: |
pip install --upgrade pip
Expand All @@ -74,8 +151,7 @@ jobs:
restore-keys: |
${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-pip-
- name: Install pip-tools
run: |
pip install -U pip-tools
run: pip install pip-tools
- name: Install apache-arrow on ubuntu
if: matrix.os == 'ubuntu-latest'
run: |
Expand All @@ -92,8 +168,17 @@ jobs:
brew install pkg-config
- name: Install dependencies
run: make install-python-ci-dependencies
- name: Test Spark
- name: Setup Redis Cluster
run: |
make test-python-universal-spark


docker pull vishnunair/docker-redis-cluster:latest
docker run -d -p 6001:6379 -p 6002:6380 -p 6003:6381 -p 6004:6382 -p 6005:6383 -p 6006:6384 --name redis-cluster vishnunair/docker-redis-cluster
- name: Test python
if: ${{ always() }} # this will guarantee that step won't be canceled and resources won't leak
env:
FEAST_SERVER_DOCKER_IMAGE_TAG: ${{ needs.build-docker-image.outputs.DOCKER_IMAGE_TAG }}
SNOWFLAKE_CI_DEPLOYMENT: ${{ secrets.SNOWFLAKE_CI_DEPLOYMENT }}
SNOWFLAKE_CI_USER: ${{ secrets.SNOWFLAKE_CI_USER }}
SNOWFLAKE_CI_PASSWORD: ${{ secrets.SNOWFLAKE_CI_PASSWORD }}
SNOWFLAKE_CI_ROLE: ${{ secrets.SNOWFLAKE_CI_ROLE }}
SNOWFLAKE_CI_WAREHOUSE: ${{ secrets.SNOWFLAKE_CI_WAREHOUSE }}
run: make test-python-universal-spark
2 changes: 1 addition & 1 deletion .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: unit-tests

on: [pull_request]
on: [pull_request, workflow_dispatch]
jobs:
unit-test-python:
runs-on: ${{ matrix.os }}
Expand Down
8 changes: 8 additions & 0 deletions protos/feast/core/DataFormat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ message StreamFormat {
string schema_json = 1;
}

// Defines options for the avro data format
message ConfluentAvroFormat {
// Optional if used in a File DataSource as schema is embedded in avro file.
// Specifies the schema of the Avro message as JSON string.
string schema_json = 1;
}

message JsonFormat {
string schema_json = 1;
}
Expand All @@ -57,5 +64,6 @@ message StreamFormat {
AvroFormat avro_format = 1;
ProtoFormat proto_format = 2;
JsonFormat json_format = 3;
ConfluentAvroFormat confluent_avro_format = 4;
}
}
21 changes: 21 additions & 0 deletions sdk/python/feast/data_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ def from_proto(cls, proto):
fmt = proto.WhichOneof("format")
if fmt == "avro_format":
return AvroFormat(schema_json=proto.avro_format.schema_json)
if fmt == "confluent_avro_format":
return ConfluentAvroFormat(schema_json=proto.confluent_avro_format.schema_json)
if fmt == "json_format":
return JsonFormat(schema_json=proto.json_format.schema_json)
if fmt == "proto_format":
Expand All @@ -115,6 +117,25 @@ def to_proto(self):
return StreamFormatProto(avro_format=proto)


class ConfluentAvroFormat(StreamFormat):
"""
Defines the ConfluentAvro streaming data format that encodes data in ConfluentAvro format
"""

def __init__(self, schema_json: str):
"""
Construct a new ConfluentAvro data format.

Args:
schema_json: ConfluentAvro schema definition in JSON
"""
self.schema_json = schema_json

def to_proto(self):
proto = StreamFormatProto.ConfluentAvroFormat(schema_json=self.schema_json)
return StreamFormatProto(confluent_avro_format=proto)


class JsonFormat(StreamFormat):
"""
Defines the Json streaming data format that encodes data in Json format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from feast.expediagroup.pydantic_models.stream_format_model import (
AnyStreamFormat,
AvroFormatModel,
ConfluentAvroFormatModel,
JsonFormatModel,
ProtoFormatModel,
)
Expand Down Expand Up @@ -230,7 +231,7 @@ def from_data_source(
)


SUPPORTED_MESSAGE_FORMATS = [AvroFormatModel, JsonFormatModel, ProtoFormatModel]
SUPPORTED_MESSAGE_FORMATS = [AvroFormatModel, ConfluentAvroFormatModel, JsonFormatModel, ProtoFormatModel]
SUPPORTED_KAFKA_BATCH_SOURCES = [RequestSourceModel, SparkSourceModel]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pydantic import Field as PydanticField
from typing_extensions import Annotated, Self

from feast.data_format import AvroFormat, JsonFormat, ProtoFormat
from feast.data_format import AvroFormat, ConfluentAvroFormat, JsonFormat, ProtoFormat


class StreamFormatModel(BaseModel):
Expand Down Expand Up @@ -63,6 +63,37 @@ def from_stream_format(
return cls(schoma=avro_format.schema_json)


class ConfluentAvroFormatModel(StreamFormatModel):
"""
Pydantic Model of a Feast ConfluentAvroFormat.
"""

format: Literal["ConfluentAvroFormatModel"] = "ConfluentAvroFormatModel"
schoma: str

def to_stream_format(self) -> ConfluentAvroFormat:
"""
Given a Pydantic ConfluentAvroFormatModel, create and return a ConfluentAvroFormat.

Returns:
A ConfluentAvroFormat.
"""
return ConfluentAvroFormat(schema_json=self.schoma)

@classmethod
def from_stream_format(
cls,
confluent_avro_format,
) -> Self: # type: ignore
"""
Converts a ConfluentAvroFormat object to its pydantic model representation.

Returns:
A ConfluentAvroFormatModel.
"""
return cls(schoma=confluent_avro_format.schema_json)


class JsonFormatModel(StreamFormatModel):
"""
Pydantic Model of a Feast JsonFormat.
Expand Down Expand Up @@ -128,6 +159,6 @@ def from_stream_format(
# https://blog.devgenius.io/deserialize-child-classes-with-pydantic-that-gonna-work-784230e1cf83
# This lets us discriminate child classes of DataSourceModel with type hints.
AnyStreamFormat = Annotated[
Union[AvroFormatModel, JsonFormatModel, ProtoFormatModel],
Union[ConfluentAvroFormatModel, AvroFormatModel, JsonFormatModel, ProtoFormatModel],
PydanticField(discriminator="format"),
]
Loading
Loading