Skip to content

Commit

Permalink
feat: create delete assets API (#77)
Browse files Browse the repository at this point in the history
* feat(proto): generate proto regarding delete assets

* feat(asset): add refreshed_at field

* feat: create translator from query expr to SQL query and ES query

* feat(module): add expr-lang/expr module

* feat(docs): add comments in QueryExprTranslator

* feat(asset): change migration update

* feat(asset): add down migration

* feat(translator): mark as private func for func that only used in translator

* feat: create delete assets API by query expr

* refactor: make interface for query expr and implement to postgresql and elasticsearch

* refactor: resolve all lint issues

* fix: add refreshed_at in insert asset query

* feat: add refreshed_at field in get all assets

* fix: update return error in delete assets

* fix: update asynchronous process in deletion assets

* refactor: fix lint issues

* feat: fix ConditionalNode logic and toString format

* refactor: remove redundant code, and make return error as soon as possible

* feat: update mock using mockery

* feat: equalize current time at created_at and updated_at based on refreshed_at both in postgresql and elasticsearch

* refactor: rename var

* refactor: remove unused go module

* test: fix existing unit tests

* test: fix existing unit tests

* refactor: makes codes to more readable

* test: create unit test for deletion API

* test: create unit test for converter

* feat: add validation in deletion query and remove redundant code

* feat: make can equals (==) or IN for type and service in delete asset expr

* test: create unit test for delete asset expr

* feat: change flow of deletion assets and refactor codes based on feedbacks

* test: fix unit test

* test: fix unit test

* refactor: remove unused comment

* feat: make maxAttemptRetry to be configurable and improve code performance

* feat: make cancel for context, and remove pointer in ESExpr and SQLExpr

* refactor: make clean as linter suggestion

* test: fix unit test due to cancel func when create new asset service

* refactor: change argument of DeleteByQueryExpr in discovery repository and in situ worker

* feat: add condition when the complex query result is time and refactor codes

* refactor: revise some codes as feedbacks and add test case in es expr test

* refactor: clean code as feedbacks

* refactor: add comment so more clear

* refactor: remove redundant code

* feat: add case for MemberNode which handle nested query

---------

Co-authored-by: Muhammad Luthfi Fahlevi <[email protected]>
  • Loading branch information
luthfifahlevi and Muhammad Luthfi Fahlevi authored Aug 22, 2024
1 parent c0bf977 commit 071df6f
Show file tree
Hide file tree
Showing 52 changed files with 4,798 additions and 1,779 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ help: ##@help show this help
NAME="github.com/goto/compass"
VERSION=$(shell git describe --always --tags 2>/dev/null)
COVERFILE="/tmp/compass.coverprofile"
PROTON_COMMIT := "fe99cc96e060085d6052096e9ba59b4038c691c6"
PROTON_COMMIT := "8375eddcb23d38f601f6036c676493d8feb84a7e"

TOOLS_MOD_DIR = ./tools
TOOLS_DIR = $(abspath ./.tools)
Expand Down
4 changes: 3 additions & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,14 @@ func runServer(ctx context.Context, cfg *Config) error {
}
}()

assetService := asset.NewService(asset.ServiceDeps{
assetService, cancel := asset.NewService(asset.ServiceDeps{
AssetRepo: assetRepository,
DiscoveryRepo: discoveryRepository,
LineageRepo: lineageRepository,
Worker: wrkr,
Logger: logger,
})
defer cancel()

// init discussion
discussionRepository, err := postgres.NewDiscussionRepository(pgClient, 0)
Expand Down
1 change: 1 addition & 0 deletions compass.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ worker:
sync_job_timeout: 15m
index_job_timeout: 5s
delete_job_timeout: 5s
max_attempt_retry: 3

client:
host: localhost:8081
Expand Down
4 changes: 4 additions & 0 deletions core/asset/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"time"

"github.com/goto/compass/core/user"
"github.com/goto/compass/pkg/queryexpr"
"github.com/r3labs/diff/v2"
)

type Repository interface {
GetAll(context.Context, Filter) ([]Asset, error)
GetCount(context.Context, Filter) (int, error)
GetCountByQueryExpr(ctx context.Context, queryExpr queryexpr.ExprStr) (int, error)
GetByID(ctx context.Context, id string) (Asset, error)
GetByURN(ctx context.Context, urn string) (Asset, error)
GetVersionHistory(ctx context.Context, flt Filter, id string) ([]Asset, error)
Expand All @@ -21,6 +23,7 @@ type Repository interface {
Upsert(ctx context.Context, ast *Asset) (string, error)
DeleteByID(ctx context.Context, id string) error
DeleteByURN(ctx context.Context, urn string) error
DeleteByQueryExpr(ctx context.Context, queryExpr queryexpr.ExprStr) ([]string, error)
AddProbe(ctx context.Context, assetURN string, probe *Probe) error
GetProbes(ctx context.Context, assetURN string) ([]Probe, error)
GetProbesWithFilter(ctx context.Context, flt ProbesFilter) (map[string][]Probe, error)
Expand All @@ -40,6 +43,7 @@ type Asset struct {
Owners []user.User `json:"owners,omitempty" diff:"owners"`
CreatedAt time.Time `json:"created_at" diff:"-"`
UpdatedAt time.Time `json:"updated_at" diff:"-"`
RefreshedAt *time.Time `json:"refreshed_at" diff:"-"`
Version string `json:"version" diff:"-"`
UpdatedBy user.User `json:"updated_by" diff:"-"`
Changelog diff.Changelog `json:"changelog,omitempty" diff:"-"`
Expand Down
73 changes: 73 additions & 0 deletions core/asset/delete_asset_expr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package asset

import (
"errors"
"fmt"
"strings"

"github.com/goto/compass/pkg/generichelper"
"github.com/goto/compass/pkg/queryexpr"
)

var (
assetJSONTagsSchema = generichelper.GetJSONTags(Asset{})
errTypeOrServiceHasWrongOperator = errors.New("identifier type and service must be equals (==) or IN operator")
errMissRequiredIdentifier = errors.New("must exists these identifiers: refreshed_at, type, and service")
)

type DeleteAssetExpr struct {
queryexpr.ExprStr
}

func (d DeleteAssetExpr) ToQuery() (string, error) {
return d.ExprStr.ToQuery()
}

func (d DeleteAssetExpr) Validate() error {
identifiersWithOperator, err := queryexpr.GetIdentifiersMap(d.ExprStr.String())
if err != nil {
return err
}

if err := d.isRequiredIdentifiersExist(identifiersWithOperator); err != nil {
return err
}

if err := d.isUsingRightOperator(identifiersWithOperator); err != nil {
return err
}

return d.isAllIdentifiersExistInStruct(identifiersWithOperator)
}

func (DeleteAssetExpr) isRequiredIdentifiersExist(identifiersWithOperator map[string]string) error {
isExist := func(jsonTag string) bool {
return identifiersWithOperator[jsonTag] != ""
}
mustExist := isExist("refreshed_at") && isExist("type") && isExist("service")
if !mustExist {
return errMissRequiredIdentifier
}
return nil
}

func (DeleteAssetExpr) isUsingRightOperator(identifiersWithOperator map[string]string) error {
isOperatorEqualsOrIn := func(jsonTag string) bool {
return identifiersWithOperator[jsonTag] == "==" || strings.ToUpper(identifiersWithOperator[jsonTag]) == "IN"
}
if !isOperatorEqualsOrIn("type") || !isOperatorEqualsOrIn("service") {
return errTypeOrServiceHasWrongOperator
}
return nil
}

func (DeleteAssetExpr) isAllIdentifiersExistInStruct(identifiersWithOperator map[string]string) error {
identifiers := generichelper.GetMapKeys(identifiersWithOperator)
for _, identifier := range identifiers {
isFieldValid := generichelper.Contains(assetJSONTagsSchema, identifier)
if !isFieldValid {
return fmt.Errorf("%s is not a valid identifier", identifier)
}
}
return nil
}
150 changes: 150 additions & 0 deletions core/asset/delete_asset_expr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package asset_test

import (
"errors"
"testing"

"github.com/goto/compass/core/asset"
"github.com/goto/compass/pkg/queryexpr"
"github.com/stretchr/testify/assert"
)

func TestDeleteAssetExpr_ToQuery(t *testing.T) {
queryExp := `name == "John" || service not in ["test1","test2","test3"]`
sqlExpr := queryexpr.SQLExpr(queryExp)
esExpr := queryexpr.ESExpr(queryExp)
wrongExpr := queryexpr.SQLExpr("findLast(")
tests := []struct {
name string
exprStr queryexpr.ExprStr
want string
wantErr bool
}{
{
name: "convert to SQL query",
exprStr: asset.DeleteAssetExpr{
ExprStr: sqlExpr,
},
want: "((name = 'John') OR (service NOT IN ('test1', 'test2', 'test3')))",
wantErr: false,
},
{
name: "convert to ES query",
exprStr: asset.DeleteAssetExpr{
ExprStr: esExpr,
},
want: `{"query":{"bool":{"should":[{"term":{"name":"John"}},{"bool":{"must_not":[{"terms":{"service.keyword":["test1","test2","test3"]}}]}}]}}}`,
wantErr: false,
},
{
name: "got error due to wrong syntax",
exprStr: asset.DeleteAssetExpr{
ExprStr: wrongExpr,
},
want: "",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := asset.DeleteAssetExpr{
ExprStr: tt.exprStr,
}
got, err := d.ToQuery()
if (err != nil) != tt.wantErr {
t.Errorf("ToQuery() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("ToQuery() got = %v, want %v", got, tt.want)
}
})
}
}

func TestDeleteAssetExpr_Validate(t *testing.T) {
tests := []struct {
name string
exprStrFn func() queryexpr.ExprStr
expectErr error
wantErr bool
}{
{
name: "error get identifiers map",
exprStrFn: func() queryexpr.ExprStr {
wrongExpr := queryexpr.SQLExpr("findLast(")
return asset.DeleteAssetExpr{
ExprStr: wrongExpr,
}
},
expectErr: errors.New("error parsing expression"),
wantErr: true,
},
{
name: "error miss refreshed_at not exist",
exprStrFn: func() queryexpr.ExprStr {
missRefreshedAt := queryexpr.SQLExpr(`updated_at < "2023-12-12 23:59:59" && type == "table" && service in ["test1","test2","test3"]`)
return asset.DeleteAssetExpr{
ExprStr: missRefreshedAt,
}
},
expectErr: errors.New("must exists these identifiers: refreshed_at, type, and service"),
wantErr: true,
},
{
name: "error miss type not exist",
exprStrFn: func() queryexpr.ExprStr {
missType := queryexpr.SQLExpr(`refreshed_at < "2023-12-12 23:59:59" && service in ["test1","test2","test3"]`)
return asset.DeleteAssetExpr{
ExprStr: missType,
}
},
expectErr: errors.New("must exists these identifiers: refreshed_at, type, and service"),
wantErr: true,
},
{
name: "error miss service not exist",
exprStrFn: func() queryexpr.ExprStr {
missService := queryexpr.SQLExpr(`refreshed_at < "2023-12-12 23:59:59" && type == "table"`)
return asset.DeleteAssetExpr{
ExprStr: missService,
}
},
expectErr: errors.New("must exists these identifiers: refreshed_at, type, and service"),
wantErr: true,
},
{
name: "error wrong operator for type identifier",
exprStrFn: func() queryexpr.ExprStr {
wrongTypeOperator := queryexpr.SQLExpr(`refreshed_at < "2023-12-12 23:59:59" && type != "table" && service in ["test1","test2","test3"]`)
return asset.DeleteAssetExpr{
ExprStr: wrongTypeOperator,
}
},
expectErr: errors.New("identifier type and service must be equals (==) or IN operator"),
wantErr: true,
},
{
name: "error wrong operator for service identifier",
exprStrFn: func() queryexpr.ExprStr {
wrongServiceOperator := queryexpr.SQLExpr(`refreshed_at < "2023-12-12 23:59:59" && type != "table" && service not in ["test1","test2","test3"]`)
return asset.DeleteAssetExpr{
ExprStr: wrongServiceOperator,
}
},
expectErr: errors.New("identifier type and service must be equals (==) or IN operator"),
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.exprStrFn().Validate()
if (err != nil) != tt.wantErr {
t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr)
}
if err != nil {
assert.ErrorContains(t, err, tt.expectErr.Error())
}
})
}
}
6 changes: 6 additions & 0 deletions core/asset/delete_assets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package asset

type DeleteAssetsRequest struct {
QueryExpr string
DryRun bool
}
2 changes: 2 additions & 0 deletions core/asset/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package asset
//go:generate mockery --name=DiscoveryRepository -r --case underscore --with-expecter --structname DiscoveryRepository --filename discovery_repository.go --output=./mocks
import (
"context"
"github.com/goto/compass/pkg/queryexpr"
)

type DiscoveryRepository interface {
Upsert(context.Context, Asset) error
DeleteByID(ctx context.Context, assetID string) error
DeleteByURN(ctx context.Context, assetURN string) error
DeleteByQueryExpr(ctx context.Context, queryExpr queryexpr.ExprStr) error
Search(ctx context.Context, cfg SearchConfig) (results []SearchResult, err error)
Suggest(ctx context.Context, cfg SearchConfig) (suggestions []string, err error)
GroupAssets(ctx context.Context, cfg GroupConfig) (results []GroupResult, err error)
Expand Down
1 change: 1 addition & 0 deletions core/asset/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ var (
ErrEmptyID = errors.New("asset does not have ID")
ErrProbeExists = errors.New("asset probe already exists")
ErrEmptyURN = errors.New("asset does not have URN")
ErrEmptyQuery = errors.New("query is empty")
ErrUnknownType = errors.New("unknown type")
ErrNilAsset = errors.New("nil asset")
)
Expand Down
1 change: 1 addition & 0 deletions core/asset/lineage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type LineageRepository interface {
GetGraph(ctx context.Context, urn string, query LineageQuery) (LineageGraph, error)
Upsert(ctx context.Context, urn string, upstreams, downstreams []string) error
DeleteByURN(ctx context.Context, urn string) error
DeleteByURNs(ctx context.Context, urns []string) error
}

type LineageGraph []LineageEdge
Expand Down
Loading

0 comments on commit 071df6f

Please sign in to comment.