From 17c419f33040cf19203b9ca99f36b7495113ed4a Mon Sep 17 00:00:00 2001 From: Julien Perrochet Date: Mon, 5 Aug 2024 20:29:19 +0200 Subject: [PATCH] [scd] OIR upsert: push down cleanup of implicit subscription into CRDB --- pkg/scd/operational_intents_handler.go | 52 ------------------- .../store/cockroach/operational_intents.go | 49 ++++++++++++++--- 2 files changed, 42 insertions(+), 59 deletions(-) diff --git a/pkg/scd/operational_intents_handler.go b/pkg/scd/operational_intents_handler.go index f1972b966..b1bb925f3 100644 --- a/pkg/scd/operational_intents_handler.go +++ b/pkg/scd/operational_intents_handler.go @@ -16,42 +16,6 @@ import ( "github.com/interuss/stacktrace" ) -// subscriptionCanBeRemoved will check if: -// - a previous subscription was attached, -// - if so, if it was an implicit subscription -// - if so, if we can remove it after creating the new implicit subscription -// -// This is to be used in contexts where an implicit subscription may need to be cleaned up. -// NOTE: this should eventually be pushed down to CRDB as part of the queries being executed in the callers of this method. -// -// See https://github.com/interuss/dss/issues/1059 for more details -func subscriptionCanBeRemoved(ctx context.Context, r repos.Repository, subscriptionID *dssmodels.ID) (bool, error) { - // Get the Subscription supporting the OperationalIntent, if one is defined - if subscriptionID != nil { - sub, err := r.GetSubscription(ctx, *subscriptionID) - if err != nil { - return false, stacktrace.Propagate(err, "Unable to get OperationalIntent's Subscription from repo") - } - if sub == nil { - return false, stacktrace.NewError("OperationalIntent's Subscription missing from repo") - } - - if sub.ImplicitSubscription { - // Get the Subscription's dependent OperationalIntents - dependentOps, err := r.GetDependentOperationalIntents(ctx, sub.ID) - if err != nil { - return false, stacktrace.Propagate(err, "Could not find dependent OperationalIntents") - } - if len(dependentOps) == 0 { - return false, stacktrace.NewError("An implicit Subscription had no dependent OperationalIntents") - } else if len(dependentOps) == 1 { - return true, nil - } - } - } - return false, nil -} - // DeleteOperationalIntentReference deletes a single operational intent ref for a given ID at // the specified version. func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *restapi.DeleteOperationalIntentReferenceRequest, @@ -459,7 +423,6 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize return stacktrace.Propagate(err, "Could not get OperationalIntent from repo") } - var previousSubscriptionID *dssmodels.ID if old != nil { if old.Manager != manager { return stacktrace.NewErrorWithCode(dsserr.PermissionDenied, @@ -471,7 +434,6 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize } version = int32(old.Version) - previousSubscriptionID = old.SubscriptionID } else { if ovn != "" { return stacktrace.NewErrorWithCode(dsserr.NotFound, "OperationalIntent does not exist and therefore is not version %s", ovn) @@ -481,7 +443,6 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize } var sub *scdmodels.Subscription - removePreviousImplicitSubscription := false if subscriptionID.Empty() { // Create an implicit subscription if the implicit subscription params are set: // for situations where these params are required but have not been set, @@ -495,11 +456,6 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize } } - removePreviousImplicitSubscription, err = subscriptionCanBeRemoved(ctx, r, previousSubscriptionID) - if err != nil { - return stacktrace.Propagate(err, "Could not determine if previous Subscription can be removed") - } - // Note: parameters for a new implicit subscription have been passed, so we will create // a new implicit subscription even if another subscription was attaches to this OIR before, // (and regardless of whether it was an implicit subscription or not). @@ -705,14 +661,6 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize return stacktrace.Propagate(err, "Failed to upsert OperationalIntent in repo") } - // Check if the previously attached subscription should be removed - if removePreviousImplicitSubscription { - err = r.DeleteSubscription(ctx, *previousSubscriptionID) - if err != nil { - return stacktrace.Propagate(err, "Unable to delete previous implicit Subscription") - } - } - // Find Subscriptions that may need to be notified allsubs, err := r.SearchSubscriptions(ctx, notifyVol4) if err != nil { diff --git a/pkg/scd/store/cockroach/operational_intents.go b/pkg/scd/store/cockroach/operational_intents.go index 22bceb9c2..f494d491e 100644 --- a/pkg/scd/store/cockroach/operational_intents.go +++ b/pkg/scd/store/cockroach/operational_intents.go @@ -210,13 +210,48 @@ func (s *repo) DeleteOperationalIntent(ctx context.Context, id dssmodels.ID) err func (s *repo) UpsertOperationalIntent(ctx context.Context, operation *scdmodels.OperationalIntent) (*scdmodels.OperationalIntent, error) { var ( upsertOperationsQuery = fmt.Sprintf(` - UPSERT INTO - scd_operations - (%s) - VALUES - ($1, $2, $3, $4, $5, $6, $7, $8, $9, transaction_timestamp(), $10, $11) - RETURNING - %s`, operationFieldsWithoutPrefix, operationFieldsWithPrefix) + WITH previous_implicit_sub AS ( + -- get the current subscription id if: + -- - it exists + -- - it is implicit + -- - the OIR's subscription is being updated (ie, the new subscription id is different from the old one) + SELECT + scd_subscriptions.id + FROM scd_operations + JOIN scd_subscriptions ON scd_operations.subscription_id = scd_subscriptions.id + WHERE + scd_operations.id = $1 + AND + scd_subscriptions.implicit = true + AND + scd_subscriptions.id != $9 + ), + upserted_oir AS ( + -- actual insertion/update statement + UPSERT INTO + scd_operations + (%s) + VALUES + ($1, $2, $3, $4, $5, $6, $7, $8, $9, transaction_timestamp(), $10, $11) + RETURNING + %s + ), + dependent_oirs AS ( -- NOTE: this sub-query will still return the OIR being mutated (!) + SELECT id + FROM scd_operations + WHERE subscription_id = (SELECT id FROM previous_implicit_sub) + ), + deleted_subscription_id AS ( + -- We are guaranteed to only delete something here if the OIR is being updated. Upon creation + -- previous_implicit_sub will be empty + DELETE FROM scd_subscriptions + WHERE id = (SELECT id FROM previous_implicit_sub) + AND (SELECT COUNT(*) FROM dependent_oirs) = 1 -- NOTE: see above, the OIR being updated is still counted here, hence a value of 1 + RETURNING id + ) + -- return the upserted OIR + SELECT * FROM upserted_oir + `, operationFieldsWithoutPrefix, operationFieldsWithPrefix) ) cids := make([]int64, len(operation.Cells))