Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate Go transformations endpoint into Expedia provider #106

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions go/internal/feast/featurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,15 @@ func NewFeatureStore(config *registry.RepoConfig, callback transformation.Transf
if err != nil {
return nil, err
}
sanitizedProjectName := strings.Replace(config.Project, "_", "-", -1)
productName := os.Getenv("PRODUCT")
endpoint := fmt.Sprintf("%s-transformations.%s.svc.cluster.local:80", sanitizedProjectName, productName)
transformationService, _ := transformation.NewGrpcTransformationService(config, endpoint)

var transformationService *transformation.GrpcTransformationService = nil

if config.GoTransformationsServer {
if config.GoTransformationsEndpoint == "" {
return nil, errors.New("Transformations server endpoint is missing. Update featue_store.yaml with go_transformations_endpint configuration")
}
transformationService, _ = transformation.NewGrpcTransformationService(config, config.GoTransformationsEndpoint)
}
return &FeatureStore{
config: config,
registry: registry,
Expand Down
5 changes: 5 additions & 0 deletions go/internal/feast/registry/repoconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ type RepoConfig struct {
RepoPath string `json:"repo_path"`
// EntityKeySerializationVersion
EntityKeySerializationVersion int64 `json:"entity_key_serialization_version"`
// If false, use gopy bindings to calculate ODFV transformations.
// "True" value required for Go feature server to serve ODFVs with stability and at scale.
GoTransformationsServer bool `json:"go_transformations_server"`
// Transformation server base endpoint.
GoTransformationsEndpoint string `json:"go_transformations_endpoint"`
}

type RegistryConfig struct {
Expand Down
27 changes: 27 additions & 0 deletions go/internal/feast/registry/repoconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,30 @@ func TestGetRegistryConfig_CacheTtlSecondsTypes(t *testing.T) {
assert.Equal(t, int64(60), registryConfig.CacheTtlSeconds)
}
}

func TestGoTransformationsEndpoint(t *testing.T) {
dir, err := os.MkdirTemp("", "feature_repo_*")
assert.Nil(t, err)
defer func() {
assert.Nil(t, os.RemoveAll(dir))
}()
filePath := filepath.Join(dir, "feature_store.yaml")
data := []byte(`
registry:
path: data/registry.db
project: feature_repo
provider: local
online_store:
type: redis
connection_string: "localhost:6379"
go_transformations_endpoint: https://go.dev:9999
go_transformations_server: True
`)
err = os.WriteFile(filePath, data, 0666)
assert.Nil(t, err)
config, err := NewRepoConfigFromFile(dir)
assert.Nil(t, err)
assert.Equal(t, dir, config.RepoPath)
assert.Equal(t, "https://go.dev:9999", config.GoTransformationsEndpoint)
assert.Equal(t, true, config.GoTransformationsServer)
}
13 changes: 13 additions & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,18 @@ class RepoConfig(FeastBaseModel):
go_feature_retrieval: Optional[bool] = False
""" If True, use the embedded Go code to retrieve features instead of the Python SDK. """

go_transformations_server: Optional[bool] = False
""" If True, use the transformations server to perform ODVF transformations in Go feature server. """

go_transformations_endpoint: Optional[StrictStr] = ""
""" Specify the endpoint for Go feature server to find the transformations server.
NOTE: Unless go_transformations_server is False, the Go feature server will throw errors if this is
blank or null.
"""

go_transformations_port: Optional[StrictInt] = 80
""" Specify the port for Go feature server to find the transformations server. """

entity_key_serialization_version: StrictInt = 1
""" Entity key serialization version: This version is used to control what serialization scheme is
used when writing data to the online store.
Expand Down Expand Up @@ -221,6 +233,7 @@ def __init__(self, **data: Any):
},
}
self._offline_config = spark_offline_config
self._go_transformation_server = True

self._online_store = None
if "online_store" in data:
Expand Down
Loading