Skip to content

Commit

Permalink
feat: upsert event types (#2201)
Browse files Browse the repository at this point in the history
  • Loading branch information
jirevwe authored Dec 6, 2024
1 parent f80f9d1 commit 8c51b79
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 10 deletions.
111 changes: 111 additions & 0 deletions api/public_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2656,6 +2656,117 @@ func (s *PublicSubscriptionIntegrationTestSuite) Test_UpdateSubscription() {
require.Equal(s.T(), subscription.RetryConfig.Duration, dbSub.RetryConfig.Duration)
}

func (s *PublicSubscriptionIntegrationTestSuite) Test_CreateSubscription_CreatesEventTypes() {
// Arrange
endpointID := ulid.Make().String()
expectedStatusCode := http.StatusCreated

// Just Before
endpoint, err := testdb.SeedEndpoint(s.ConvoyApp.A.DB, s.DefaultProject, endpointID, "", "", false, datastore.ActiveEndpointStatus)
require.NoError(s.T(), err)

bodyStr := fmt.Sprintf(`{
"name": "test-sub",
"type": "incoming",
"project_id": "%s",
"endpoint_id": "%s",
"filter_config": {
"event_types": [
"user.created",
"user.updated"
]
}
}`, s.DefaultProject.UID, endpoint.UID)

// Arrange Request
url := fmt.Sprintf("/api/v1/projects/%s/subscriptions", s.DefaultProject.UID)
body := serialize(bodyStr)
req := createRequest(http.MethodPost, url, s.APIKey, body)
w := httptest.NewRecorder()

// Act
s.Router.ServeHTTP(w, req)

// Assert
require.Equal(s.T(), expectedStatusCode, w.Code)

// Deep Assert
var subscription *datastore.Subscription
parseResponse(s.T(), w.Result(), &subscription)

// Verify subscription was created
subRepo := postgres.NewSubscriptionRepo(s.ConvoyApp.A.DB, nil)
dbSub, err := subRepo.FindSubscriptionByID(context.Background(), s.DefaultProject.UID, subscription.UID)
require.NoError(s.T(), err)
require.Equal(s.T(), 2, len(dbSub.FilterConfig.EventTypes))

// Verify event types were created
query := `SELECT COUNT(*) FROM convoy.event_types
WHERE project_id = $1 AND name IN ('user.created', 'user.updated')`
var count int
err = s.ConvoyApp.A.DB.GetDB().QueryRowxContext(context.Background(), query, s.DefaultProject.UID).Scan(&count)
require.NoError(s.T(), err)
require.Equal(s.T(), 2, count)
}

func (s *PublicSubscriptionIntegrationTestSuite) Test_UpdateSubscription_CreatesNewEventTypes() {
// Arrange
endpointID := ulid.Make().String()
expectedStatusCode := http.StatusAccepted

// Just Before
endpoint, err := testdb.SeedEndpoint(s.ConvoyApp.A.DB, s.DefaultProject, endpointID, "", "", false, datastore.ActiveEndpointStatus)
require.NoError(s.T(), err)

// Create initial subscription
sub, err := testdb.SeedSubscription(s.ConvoyApp.A.DB, s.DefaultProject, ulid.Make().String(),
datastore.OutgoingProject, &datastore.Source{}, endpoint,
&datastore.RetryConfiguration{}, &datastore.AlertConfiguration{},
&datastore.FilterConfiguration{
EventTypes: []string{"initial.event"},
})
require.NoError(s.T(), err)

bodyStr := fmt.Sprintf(`{
"name": "test-sub",
"filter_config": {
"event_types": [
"user.deleted",
"user.churned"
]
}
}`)

// Arrange Request
url := fmt.Sprintf("/api/v1/projects/%s/subscriptions/%s", s.DefaultProject.UID, sub.UID)
body := serialize(bodyStr)
req := createRequest(http.MethodPut, url, s.APIKey, body)
w := httptest.NewRecorder()

// Act
s.Router.ServeHTTP(w, req)

// Assert
require.Equal(s.T(), expectedStatusCode, w.Code)

// Deep Assert
var subscription *datastore.Subscription
parseResponse(s.T(), w.Result(), &subscription)

// Verify subscription was updated
subRepo := postgres.NewSubscriptionRepo(s.ConvoyApp.A.DB, nil)
dbSub, err := subRepo.FindSubscriptionByID(context.Background(), s.DefaultProject.UID, subscription.UID)
require.NoError(s.T(), err)
require.Equal(s.T(), 2, len(dbSub.FilterConfig.EventTypes))

// Verify new event types were created
query := `SELECT COUNT(*) FROM convoy.event_types WHERE project_id = $1 AND name IN ('user.deleted', 'user.churned')`
var count int
err = s.ConvoyApp.A.DB.GetDB().QueryRowxContext(context.Background(), query, s.DefaultProject.UID).Scan(&count)
require.NoError(s.T(), err)
require.Equal(s.T(), 2, count)
}

func TestPublicSubscriptionIntegrationTestSuite(t *testing.T) {
suite.Run(t, new(PublicSubscriptionIntegrationTestSuite))
}
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ var DefaultConfiguration = Configuration{
},
},
Dispatcher: DispatcherConfiguration{
InsecureSkipVerify: true,
InsecureSkipVerify: false,
AllowList: []string{"0.0.0.0/0", "::/0"},
BlockList: []string{"127.0.0.0/8", "::1/128"},
},
Expand Down
6 changes: 3 additions & 3 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestLoadConfig(t *testing.T) {
},
},
Dispatcher: DispatcherConfiguration{
InsecureSkipVerify: true,
InsecureSkipVerify: false,
AllowList: []string{"0.0.0.0/0", "::/0"},
BlockList: []string{"127.0.0.0/8", "::1/128"},
},
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestLoadConfig(t *testing.T) {
},
},
Dispatcher: DispatcherConfiguration{
InsecureSkipVerify: true,
InsecureSkipVerify: false,
AllowList: []string{"0.0.0.0/0", "::/0"},
BlockList: []string{"127.0.0.0/8", "::1/128"},
},
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestLoadConfig(t *testing.T) {
},
},
Dispatcher: DispatcherConfiguration{
InsecureSkipVerify: true,
InsecureSkipVerify: false,
AllowList: []string{"0.0.0.0/0", "::/0"},
BlockList: []string{"127.0.0.0/8", "::1/128"},
},
Expand Down
80 changes: 75 additions & 5 deletions database/postgres/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/oklog/ulid/v2"
"math"
"time"

Expand Down Expand Up @@ -259,6 +260,9 @@ const (
deleted_at = NOW()
WHERE id = $1 AND project_id = $2;
`

upsertSubscriptionEventTypes = `
INSERT INTO convoy.event_types (id, name, project_id, description, category) VALUES (:id, :name, :project_id, :description, :category) on conflict do nothing;`
)

var (
Expand Down Expand Up @@ -481,7 +485,18 @@ func (s *subscriptionRepo) CreateSubscription(ctx context.Context, projectID str

fc.Filter.IsFlattened = true // this is just a flag so we can identify old records

result, err := s.db.ExecContext(
tx, err := s.db.BeginTxx(ctx, nil)
if err != nil {
return err
}
defer func(tx *sqlx.Tx) {
innerErr := tx.Rollback()
if innerErr != nil {
err = innerErr
}
}(tx)

result, err := tx.ExecContext(
ctx, createSubscription, subscription.UID,
subscription.Name, subscription.Type, subscription.ProjectID,
endpointID, deviceID, sourceID,
Expand All @@ -503,14 +518,36 @@ func (s *subscriptionRepo) CreateSubscription(ctx context.Context, projectID str
}

_subscription := &datastore.Subscription{}
err = s.db.QueryRowxContext(ctx, fmt.Sprintf(fetchSubscriptionByID, "s.id", "s.project_id"), subscription.UID, projectID).StructScan(_subscription)
err = tx.QueryRowxContext(ctx, fmt.Sprintf(fetchSubscriptionByID, "s.id", "s.project_id"), subscription.UID, projectID).StructScan(_subscription)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return datastore.ErrSubscriptionNotFound
}
return err
}

eventTypesSlice := make([]*datastore.ProjectEventType, len(subscription.FilterConfig.EventTypes))
for i := range subscription.FilterConfig.EventTypes {
eventTypesSlice[i] = &datastore.ProjectEventType{
UID: ulid.Make().String(),
Name: subscription.FilterConfig.EventTypes[i],
ProjectId: subscription.ProjectID,
Description: "",
Category: "",
}
}

// create event types for each subscription
_, err = tx.NamedExecContext(ctx, upsertSubscriptionEventTypes, eventTypesSlice)
if err != nil {
return err
}

err = tx.Commit()
if err != nil {
return err
}

nullifyEmptyConfig(_subscription)
*subscription = *_subscription

Expand All @@ -520,7 +557,7 @@ func (s *subscriptionRepo) CreateSubscription(ctx context.Context, projectID str
return err
}

return nil
return err
}

func (s *subscriptionRepo) UpdateSubscription(ctx context.Context, projectID string, subscription *datastore.Subscription) error {
Expand All @@ -546,6 +583,17 @@ func (s *subscriptionRepo) UpdateSubscription(ctx context.Context, projectID str

fc.Filter.IsFlattened = true // this is just a flag so we can identify old records

tx, err := s.db.BeginTxx(ctx, nil)
if err != nil {
return err
}
defer func(tx *sqlx.Tx) {
innerErr := tx.Rollback()
if innerErr != nil {
err = innerErr
}
}(tx)

result, err := s.db.ExecContext(
ctx, updateSubscription, subscription.UID, projectID,
subscription.Name, subscription.EndpointID, sourceID,
Expand All @@ -567,14 +615,36 @@ func (s *subscriptionRepo) UpdateSubscription(ctx context.Context, projectID str
}

_subscription := &datastore.Subscription{}
err = s.db.QueryRowxContext(ctx, fmt.Sprintf(fetchSubscriptionByID, "s.id", "s.project_id"), subscription.UID, projectID).StructScan(_subscription)
err = tx.QueryRowxContext(ctx, fmt.Sprintf(fetchSubscriptionByID, "s.id", "s.project_id"), subscription.UID, projectID).StructScan(_subscription)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return datastore.ErrSubscriptionNotFound
}
return err
}

eventTypesSlice := make([]*datastore.ProjectEventType, len(subscription.FilterConfig.EventTypes))
for i := range subscription.FilterConfig.EventTypes {
eventTypesSlice[i] = &datastore.ProjectEventType{
UID: ulid.Make().String(),
Name: subscription.FilterConfig.EventTypes[i],
ProjectId: subscription.ProjectID,
Description: "",
Category: "",
}
}

// create event types for each subscription
_, err = tx.NamedExecContext(ctx, upsertSubscriptionEventTypes, eventTypesSlice)
if err != nil {
return err
}

err = tx.Commit()
if err != nil {
return err
}

nullifyEmptyConfig(_subscription)
*subscription = *_subscription

Expand All @@ -584,7 +654,7 @@ func (s *subscriptionRepo) UpdateSubscription(ctx context.Context, projectID str
return err
}

return nil
return err
}

func (s *subscriptionRepo) LoadSubscriptionsPaged(ctx context.Context, projectID string, filter *datastore.FilterBy, pageable datastore.Pageable) ([]datastore.Subscription, datastore.PaginationData, error) {
Expand Down
3 changes: 2 additions & 1 deletion net/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ func BlockListOption(blockList []string) DispatcherOption {
}
}

// InsecureSkipVerifyOption allow self-signed certificates if set to false which is susceptible to MITM attacks.
// InsecureSkipVerifyOption allow self-signed certificates
// to be used if set to true but is susceptible to Man In The Middle attacks.
func InsecureSkipVerifyOption(insecureSkipVerify bool) DispatcherOption {
return func(d *Dispatcher) error {
if insecureSkipVerify {
Expand Down
6 changes: 6 additions & 0 deletions sql/1733488138.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- +migrate Up
create unique index if not exists idx_event_types_name_project_id
on convoy.event_types (project_id, name);

-- +migrate Down
drop index if exists convoy.idx_event_types_name_project_id;

0 comments on commit 8c51b79

Please sign in to comment.