diff --git a/Makefile b/Makefile index 4d548f2fe..63c188f42 100644 --- a/Makefile +++ b/Makefile @@ -114,8 +114,10 @@ test-go-units-crdb: cleanup-test-go-units-crdb @docker run -d --name dss-crdb-for-testing -p 26257:26257 -p 8080:8080 cockroachdb/cockroach:v24.1.3 start-single-node --insecure > /dev/null @until [ -n "`docker logs dss-crdb-for-testing | grep 'nodeID'`" ]; do echo "Waiting for CRDB to be ready"; sleep 3; done; go run ./cmds/db-manager/main.go migrate --schemas_dir ./build/db_schemas/rid --db_version latest --cockroach_host localhost + go run ./cmds/db-manager/main.go migrate --schemas_dir ./build/db_schemas/scd --db_version latest --cockroach_host localhost go test -count=1 -v ./pkg/rid/store/cockroach --cockroach_host localhost --cockroach_port 26257 --cockroach_ssl_mode disable --cockroach_user root --cockroach_db_name rid go test -count=1 -v ./pkg/rid/application --cockroach_host localhost --cockroach_port 26257 --cockroach_ssl_mode disable --cockroach_user root --cockroach_db_name rid + go test -count=1 -v ./pkg/scd/store/cockroach --cockroach_host localhost --cockroach_port 26257 --cockroach_ssl_mode disable --cockroach_user root --cockroach_db_name scd @docker stop dss-crdb-for-testing > /dev/null @docker rm dss-crdb-for-testing > /dev/null diff --git a/cmds/db-manager/cleanup/README.md b/cmds/db-manager/cleanup/README.md new file mode 100644 index 000000000..4b1f5cd82 --- /dev/null +++ b/cmds/db-manager/cleanup/README.md @@ -0,0 +1,80 @@ +# DB Cleanup + +## evict +CLI tool that lists and deletes expired entities in the DSS store. +At the time of writing this README, the entities supported by this tool are: +- SCD operational intents; +- SCD subscriptions. + +The usage of this tool is potentially dangerous: inputting wrong parameters may result in loss of data. +As such it is strongly recommended to always review and validate the list of entities identified as expired, and to +ensure that a backup of the data is available before deleting anything using the `--delete` flag. + +### Performance impact +The current implementation of this tool might have a performance impact due notably to lock contention if the number of +entities to be removed is high. With the system under heavy load it might even fail to remove them. That is due to the +fact that the expired entities are all identified and removed within a single transaction: with concurrent competing +transactions succeeding faster, there might be enough failures so that the tool fails. There is no risk of data +inconsistency and the cleanup may just be tried again in that case. + +To avoid this issue: +- perform the cleanup during a low intensity period (e.g. at night); +- iteratively cleanup the entities by starting with a lower TTL and progressively making it higher. + +If this becomes enough of an issue in the future it could be considered implementing batching of removals. + +### Usage +Extract from running `db-manager evict --help`: +``` +List and evict expired entities + +Usage: + db-manager evict [flags] + +Flags: + --delete set this flag to true to delete the expired entities + -h, --help help for evict + --scd_oir set this flag to true to list expired SCD operational intents (default true) + --scd_sub set this flag to true to list expired SCD subscriptions (default true) + --ttl duration time-to-live duration used for determining expiration, defaults to 2*56 days which should be a safe value in most cases (default 2688h0m0s) + +Global Flags: + --cockroach_application_name string application name for tagging the connection to cockroach (default "dss") + --cockroach_db_name string application name for tagging the connection to cockroach (default "dss") + --cockroach_host string cockroach host to connect to + --cockroach_max_retries int maximum number of attempts to retry a query in case of contention, default is 100 (default 100) + --cockroach_port int cockroach port to connect to (default 26257) + --cockroach_ssl_dir string directory to ssl certificates. Must contain files: ca.crt, client..crt, client..key + --cockroach_ssl_mode string cockroach sslmode (default "disable") + --cockroach_user string cockroach user to authenticate as (default "root") + --max_conn_idle_secs int maximum amount of time in seconds a connection may be idle, default is 30 seconds (default 30) + --max_open_conns int maximum number of open connections to the database, default is 4 (default 4) + +``` + +Do note: +- by default expired entities are only listed, not deleted, the flag `--delete` is required for deleting entities; +- expiration of entities is preferably determined through their end times, however when they do not have end times, the last update times are used; +- the flag `--ttl` accepts durations formatted as [Go `time.Duration` strings](https://pkg.go.dev/time#ParseDuration), e.g. `24h`; +- the CockroachDB cluster connection flags are the same as [the `core-service` command](../../core-service/README.md). + +### Examples +The following examples assume a running DSS deployed locally through [the `run_locally.sh` script](../../../build/dev/standalone_instance.md). + +#### List all entities older than 1 week +```shell +docker compose -f docker-compose_dss.yaml -p dss_sandbox exec local-dss-core-service db-manager evict \ + --cockroach_host=local-dss-crdb --ttl=168h +``` + +#### List operational intents older than 1 week +```shell +docker compose -f docker-compose_dss.yaml -p dss_sandbox exec local-dss-core-service db-manager evict \ + --cockroach_host=local-dss-crdb --ttl=168h --scd_oir=true --scd_sub=false +``` + +#### Delete all entities older than 30 days +```shell +docker compose -f docker-compose_dss.yaml -p dss_sandbox exec local-dss-core-service db-manager evict \ + --cockroach_host=local-dss-crdb --ttl=720h --delete +``` diff --git a/cmds/db-manager/cleanup/evict.go b/cmds/db-manager/cleanup/evict.go new file mode 100644 index 000000000..1bcb11fe3 --- /dev/null +++ b/cmds/db-manager/cleanup/evict.go @@ -0,0 +1,129 @@ +package cleanup + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/interuss/dss/pkg/cockroach" + crdbflags "github.com/interuss/dss/pkg/cockroach/flags" + dssmodels "github.com/interuss/dss/pkg/models" + scdmodels "github.com/interuss/dss/pkg/scd/models" + "github.com/interuss/dss/pkg/scd/repos" + scdc "github.com/interuss/dss/pkg/scd/store/cockroach" + "github.com/spf13/cobra" + "github.com/spf13/pflag" +) + +var ( + EvictCmd = &cobra.Command{ + Use: "evict", + Short: "List and evict expired entities", + RunE: evict, + } + flags = pflag.NewFlagSet("evict", pflag.ExitOnError) + listScdOirs = flags.Bool("scd_oir", true, "set this flag to true to list expired SCD operational intents") + listScdSubs = flags.Bool("scd_sub", true, "set this flag to true to list expired SCD subscriptions") + ttl = flags.Duration("ttl", time.Hour*24*112, "time-to-live duration used for determining expiration, defaults to 2*56 days which should be a safe value in most cases") + deleteExpired = flags.Bool("delete", false, "set this flag to true to delete the expired entities") +) + +func init() { + EvictCmd.Flags().AddFlagSet(flags) +} + +func evict(cmd *cobra.Command, _ []string) error { + var ( + ctx = cmd.Context() + threshold = time.Now().Add(-*ttl) + ) + + scdStore, err := getSCDStore(ctx) + if err != nil { + return err + } + + var ( + expiredOpIntents []*scdmodels.OperationalIntent + expiredSubs []*scdmodels.Subscription + ) + action := func(ctx context.Context, r repos.Repository) (err error) { + if *listScdOirs { + expiredOpIntents, err = r.ListExpiredOperationalIntents(ctx, threshold) + if err != nil { + return fmt.Errorf("listing expired operational intents: %w", err) + } + if *deleteExpired { + for _, opIntent := range expiredOpIntents { + if err = r.DeleteOperationalIntent(ctx, opIntent.ID); err != nil { + return fmt.Errorf("deleting expired operational intents: %w", err) + } + } + } + } + + if *listScdSubs { + expiredSubs, err = r.ListExpiredSubscriptions(ctx, threshold) + if err != nil { + return fmt.Errorf("listing expired subscriptions: %w", err) + } + if *deleteExpired { + for _, sub := range expiredSubs { + if err = r.DeleteSubscription(ctx, sub.ID); err != nil { + return fmt.Errorf("deleting expired subscriptions: %w", err) + } + } + } + } + + return nil + } + if err = scdStore.Transact(ctx, action); err != nil { + return fmt.Errorf("failed to execute CRDB transaction: %w", err) + } + + for _, opIntent := range expiredOpIntents { + logExpiredEntity("operational intent", opIntent.ID, threshold, *deleteExpired, opIntent.EndTime != nil) + } + for _, sub := range expiredSubs { + logExpiredEntity("subscription", sub.ID, threshold, *deleteExpired, sub.EndTime != nil) + } + if len(expiredOpIntents) == 0 && len(expiredSubs) == 0 { + log.Printf("no entity older than %s found", threshold.String()) + } else if !*deleteExpired { + log.Printf("no entity was deleted, run the command again with the `--delete` flag to do so") + } + return nil +} + +func getSCDStore(ctx context.Context) (*scdc.Store, error) { + connectParameters := crdbflags.ConnectParameters() + connectParameters.ApplicationName = "db-manager" + connectParameters.DBName = scdc.DatabaseName + scdCrdb, err := cockroach.Dial(ctx, connectParameters) + if err != nil { + logParams := connectParameters + logParams.Credentials.Password = "[REDACTED]" + return nil, fmt.Errorf("failed to connect to database with %+v: %w", logParams, err) + } + + scdStore, err := scdc.NewStore(ctx, scdCrdb) + if err != nil { + return nil, fmt.Errorf("failed to create strategic conflict detection store with %+v: %w", connectParameters, err) + } + return scdStore, nil +} + +func logExpiredEntity(entity string, entityID dssmodels.ID, threshold time.Time, deleted, hasEndTime bool) { + logMsg := "found" + if deleted { + logMsg = "deleted" + } + + expMsg := "last update before %s (missing end time)" + if hasEndTime { + expMsg = "end time before %s" + } + log.Printf("%s %s %s; expired due to %s", logMsg, entity, entityID.String(), fmt.Sprintf(expMsg, threshold.String())) +} diff --git a/cmds/db-manager/main.go b/cmds/db-manager/main.go index 37052232f..37c5b8370 100644 --- a/cmds/db-manager/main.go +++ b/cmds/db-manager/main.go @@ -5,6 +5,7 @@ import ( "log" "os" + "github.com/interuss/dss/cmds/db-manager/cleanup" "github.com/interuss/dss/cmds/db-manager/migration" "github.com/spf13/cobra" ) @@ -19,6 +20,7 @@ var ( func init() { DBManagerCmd.PersistentFlags().AddGoFlagSet(flag.CommandLine) // enable support for flags not yet migrated to using pflag (e.g. crdb flags) DBManagerCmd.AddCommand(migration.MigrationCmd) + DBManagerCmd.AddCommand(cleanup.EvictCmd) } func main() { diff --git a/pkg/scd/repos/repos.go b/pkg/scd/repos/repos.go index ab1cc4a9f..fc84eb74a 100644 --- a/pkg/scd/repos/repos.go +++ b/pkg/scd/repos/repos.go @@ -2,6 +2,8 @@ package repos import ( "context" + "time" + "github.com/golang/geo/s2" dssmodels "github.com/interuss/dss/pkg/models" scdmodels "github.com/interuss/dss/pkg/scd/models" @@ -27,6 +29,10 @@ type OperationalIntent interface { // GetDependentOperationalIntents returns IDs of all operations dependent on // subscription identified by "subscriptionID". GetDependentOperationalIntents(ctx context.Context, subscriptionID dssmodels.ID) ([]dssmodels.ID, error) + + // ListExpiredOperationalIntents lists all operational intents older than the threshold. + // Their age is determined by their end time, or by their update time if they do not have an end time. + ListExpiredOperationalIntents(ctx context.Context, threshold time.Time) ([]*scdmodels.OperationalIntent, error) } // Subscription abstracts subscription-specific interactions with the backing repository. @@ -54,6 +60,10 @@ type Subscription interface { // LockSubscriptionsOnCells locks the subscriptions of interest on specific cells. LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion) error + + // ListExpiredSubscriptions lists all subscriptions older than the threshold. + // Their age is determined by their end time, or by their update time if they do not have an end time. + ListExpiredSubscriptions(ctx context.Context, threshold time.Time) ([]*scdmodels.Subscription, error) } type UssAvailability interface { diff --git a/pkg/scd/store/cockroach/operational_intents.go b/pkg/scd/store/cockroach/operational_intents.go index 90b3082c7..dfd410d14 100644 --- a/pkg/scd/store/cockroach/operational_intents.go +++ b/pkg/scd/store/cockroach/operational_intents.go @@ -341,3 +341,29 @@ func (s *repo) GetDependentOperationalIntents(ctx context.Context, subscriptionI return dependentOps, nil } + +// ListExpiredOperationalIntents lists all operational intents older than the threshold. +// Their age is determined by their end time, or by their last update time if they do not have an end time. +func (s *repo) ListExpiredOperationalIntents(ctx context.Context, threshold time.Time) ([]*scdmodels.OperationalIntent, error) { + expiredOpIntentsQuery := fmt.Sprintf(` + SELECT + %s + FROM + scd_operations + WHERE + scd_operations.ends_at IS NOT NULL AND scd_operations.ends_at <= $1 + OR + scd_operations.ends_at IS NULL AND scd_operations.updated_at <= $1 -- use last update time as reference if there is no end time + LIMIT $2`, operationFieldsWithPrefix) + + result, err := s.fetchOperationalIntents( + ctx, s.q, expiredOpIntentsQuery, + threshold, + dssmodels.MaxResultLimit, + ) + if err != nil { + return nil, stacktrace.Propagate(err, "Error fetching Operations") + } + + return result, nil +} diff --git a/pkg/scd/store/cockroach/operational_intents_test.go b/pkg/scd/store/cockroach/operational_intents_test.go new file mode 100644 index 000000000..a0f41c4e9 --- /dev/null +++ b/pkg/scd/store/cockroach/operational_intents_test.go @@ -0,0 +1,141 @@ +package cockroach + +import ( + "context" + "testing" + "time" + + "github.com/golang/geo/s2" + "github.com/interuss/dss/pkg/models" + scdmodels "github.com/interuss/dss/pkg/scd/models" + "github.com/stretchr/testify/require" +) + +var ( + oi1ID = models.ID("00000185-e36d-40be-8d38-beca6ca30000") + oi2ID = models.ID("00000185-e36d-40be-8d38-beca6ca30001") + oi3ID = models.ID("00000185-e36d-40be-8d38-beca6ca30003") + + cells = s2.CellUnion{ + s2.CellID(int64(8768904281496485888)), + s2.CellID(int64(8768904178417270784)), + } + + start1 = time.Date(2024, time.August, 14, 15, 48, 36, 0, time.UTC) + end1 = start1.Add(time.Hour) + start2 = time.Date(2024, time.September, 15, 15, 48, 36, 0, time.UTC) + end2 = start2.Add(time.Hour) + start3 = time.Date(2024, time.September, 16, 15, 48, 36, 0, time.UTC) + end3 = start3.Add(time.Hour) + + altLow, altHigh float32 = 84, 169 +) + +var ( + oi1 = &scdmodels.OperationalIntent{ + ID: oi1ID, + Manager: "unittest", + Version: 1, + State: scdmodels.OperationalIntentStateAccepted, + StartTime: &start1, + EndTime: &end1, + USSBaseURL: "https://dummy.uss", + SubscriptionID: &sub1ID, + AltitudeLower: &altLow, + AltitudeUpper: &altHigh, + Cells: cells, + } + oi2 = &scdmodels.OperationalIntent{ + ID: oi2ID, + Manager: "unittest", + Version: 1, + State: scdmodels.OperationalIntentStateAccepted, + StartTime: &start2, + EndTime: &end2, + USSBaseURL: "https://dummy.uss", + SubscriptionID: &sub2ID, + AltitudeLower: &altLow, + AltitudeUpper: &altHigh, + Cells: cells, + } + oi3 = &scdmodels.OperationalIntent{ + ID: oi3ID, + Manager: "unittest", + Version: 1, + State: scdmodels.OperationalIntentStateAccepted, + StartTime: &start3, + EndTime: &end3, + USSBaseURL: "https://dummy.uss", + SubscriptionID: &sub3ID, + AltitudeLower: &altLow, + AltitudeUpper: &altHigh, + Cells: cells, + } +) + +func TestListExpiredOperationalIntents(t *testing.T) { + var ( + ctx = context.Background() + store, tearDownStore = setUpStore(ctx, t) + ) + require.NotNil(t, store) + defer tearDownStore() + + r, err := store.Interact(ctx) + require.NoError(t, err) + + _, err = r.UpsertSubscription(ctx, sub1) + require.NoError(t, err) + _, err = r.UpsertOperationalIntent(ctx, oi1) + require.NoError(t, err) + + _, err = r.UpsertSubscription(ctx, sub2) + require.NoError(t, err) + _, err = r.UpsertOperationalIntent(ctx, oi2) + require.NoError(t, err) + + _, err = r.UpsertSubscription(ctx, sub3) + require.NoError(t, err) + _, err = r.UpsertOperationalIntent(ctx, oi3) + require.NoError(t, err) + + testCases := []struct { + name string + timeRef time.Time + ttl time.Duration + expired []models.ID + }{{ + name: "none expired, one in close past", + timeRef: time.Date(2024, time.August, 25, 15, 0, 0, 0, time.UTC), + ttl: time.Hour * 24 * 30, + expired: []models.ID{}, + }, { + name: "one recently expired, one current, one in future", + timeRef: time.Date(2024, time.September, 15, 16, 0, 0, 0, time.UTC), + ttl: time.Hour * 24 * 30, + expired: []models.ID{oi1ID}, + }, { + name: "two expired, one in future", + timeRef: time.Date(2024, time.September, 16, 16, 0, 0, 0, time.UTC), + ttl: time.Hour * 2, + expired: []models.ID{oi1ID, oi2ID}, + }, { + name: "all expired", + timeRef: time.Date(2024, time.December, 15, 15, 0, 0, 0, time.UTC), + ttl: time.Hour * 24 * 30, + expired: []models.ID{oi1ID, oi2ID, oi3ID}, + }} + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + threshold := testCase.timeRef.Add(-testCase.ttl) + expired, err := r.ListExpiredOperationalIntents(ctx, threshold) + require.NoError(t, err) + + expiredIDs := make([]models.ID, 0, len(expired)) + for _, expiredOi := range expired { + expiredIDs = append(expiredIDs, expiredOi.ID) + } + require.ElementsMatch(t, expiredIDs, testCase.expired) + }) + } +} diff --git a/pkg/scd/store/cockroach/store_test.go b/pkg/scd/store/cockroach/store_test.go new file mode 100644 index 000000000..7ec03a92e --- /dev/null +++ b/pkg/scd/store/cockroach/store_test.go @@ -0,0 +1,53 @@ +package cockroach + +import ( + "context" + "testing" + + "github.com/interuss/dss/pkg/cockroach" + "github.com/interuss/dss/pkg/cockroach/flags" + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/require" +) + +var ( + fakeClock = clockwork.NewFakeClock() +) + +func setUpStore(ctx context.Context, t *testing.T) (*Store, func()) { + connectParameters := flags.ConnectParameters() + if connectParameters.Host == "" || connectParameters.Port == 0 { + t.Skip() + } + // Reset the clock for every test. + fakeClock = clockwork.NewFakeClock() + + store, err := newStore(ctx, t, connectParameters) + require.NoError(t, err) + return store, func() { + require.NoError(t, CleanUp(ctx, store)) + require.NoError(t, store.Close()) + } +} + +func newStore(ctx context.Context, t *testing.T, connectParameters cockroach.ConnectParameters) (*Store, error) { + db, err := cockroach.Dial(ctx, connectParameters) + require.NoError(t, err) + + return &Store{ + db: db, + clock: fakeClock, + }, nil +} + +// CleanUp drops all required tables from the store, useful for testing. +func CleanUp(ctx context.Context, s *Store) error { + const query = ` + DELETE FROM scd_subscriptions WHERE id IS NOT NULL; + DELETE FROM scd_operations WHERE id IS NOT NULL; + DELETE FROM scd_constraints WHERE id IS NOT NULL; + DELETE FROM scd_uss_availability WHERE id IS NOT NULL;` + + _, err := s.db.Pool.Exec(ctx, query) + return err +} diff --git a/pkg/scd/store/cockroach/subscriptions.go b/pkg/scd/store/cockroach/subscriptions.go index 7cc7e9149..811e1e451 100644 --- a/pkg/scd/store/cockroach/subscriptions.go +++ b/pkg/scd/store/cockroach/subscriptions.go @@ -377,3 +377,30 @@ func (c *repo) LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion) return nil } + +// ListExpiredSubscriptions lists all subscriptions older than the threshold. +// Their age is determined by their end time, or by their update time if they do not have an end time. +func (c *repo) ListExpiredSubscriptions(ctx context.Context, threshold time.Time) ([]*scdmodels.Subscription, error) { + expiredSubsQuery := fmt.Sprintf(` + SELECT + %s + FROM + scd_subscriptions + WHERE + scd_subscriptions.ends_at IS NOT NULL AND scd_subscriptions.ends_at <= $1 + OR + scd_subscriptions.ends_at IS NULL AND scd_subscriptions.updated_at <= $1 -- use last update time as reference if there is no end time + LIMIT $2`, subscriptionFieldsWithPrefix) + + subscriptions, err := c.fetchSubscriptions( + ctx, c.q, expiredSubsQuery, + threshold, + dssmodels.MaxResultLimit, + ) + if err != nil { + return nil, stacktrace.Propagate(err, "Unable to fetch Subscriptions") + } + + return subscriptions, nil + +} diff --git a/pkg/scd/store/cockroach/subscriptions_test.go b/pkg/scd/store/cockroach/subscriptions_test.go new file mode 100644 index 000000000..cd4a11d97 --- /dev/null +++ b/pkg/scd/store/cockroach/subscriptions_test.go @@ -0,0 +1,117 @@ +package cockroach + +import ( + "context" + "testing" + "time" + + "github.com/interuss/dss/pkg/models" + scdmodels "github.com/interuss/dss/pkg/scd/models" + "github.com/stretchr/testify/require" +) + +var ( + sub1ID = models.ID("189ec22f-5e61-418a-940b-36de2d201fd5") + sub2ID = models.ID("78f98cc5-94f3-4c04-8da9-a8398feba3f3") + sub3ID = models.ID("9f0d4575-b275-4a4c-a261-e1e04d324565") +) + +var ( + sub1 = &scdmodels.Subscription{ + ID: sub1ID, + NotificationIndex: 1, + Manager: "unittest", + StartTime: &start1, + EndTime: &end1, + USSBaseURL: "https://dummy.uss", + NotifyForOperationalIntents: true, + NotifyForConstraints: false, + ImplicitSubscription: true, + Cells: cells, + } + sub2 = &scdmodels.Subscription{ + ID: sub2ID, + NotificationIndex: 1, + Manager: "unittest", + StartTime: &start2, + EndTime: &end2, + USSBaseURL: "https://dummy.uss", + NotifyForOperationalIntents: true, + NotifyForConstraints: false, + ImplicitSubscription: true, + Cells: cells, + } + sub3 = &scdmodels.Subscription{ + ID: sub3ID, + NotificationIndex: 1, + Manager: "unittest", + StartTime: &start3, + EndTime: &end3, + USSBaseURL: "https://dummy.uss", + NotifyForOperationalIntents: true, + NotifyForConstraints: false, + ImplicitSubscription: true, + Cells: cells, + } +) + +func TestListExpiredSubscriptions(t *testing.T) { + var ( + ctx = context.Background() + store, tearDownStore = setUpStore(ctx, t) + ) + require.NotNil(t, store) + defer tearDownStore() + + r, err := store.Interact(ctx) + require.NoError(t, err) + + _, err = r.UpsertSubscription(ctx, sub1) + require.NoError(t, err) + + _, err = r.UpsertSubscription(ctx, sub2) + require.NoError(t, err) + + _, err = r.UpsertSubscription(ctx, sub3) + require.NoError(t, err) + + testCases := []struct { + name string + timeRef time.Time + ttl time.Duration + expired []models.ID + }{{ + name: "none expired, one in close past", + timeRef: time.Date(2024, time.August, 25, 15, 0, 0, 0, time.UTC), + ttl: time.Hour * 24 * 30, + expired: []models.ID{}, + }, { + name: "one recently expired, one current, one in future", + timeRef: time.Date(2024, time.September, 15, 16, 0, 0, 0, time.UTC), + ttl: time.Hour * 24 * 30, + expired: []models.ID{sub1ID}, + }, { + name: "two expired, one in future", + timeRef: time.Date(2024, time.September, 16, 16, 0, 0, 0, time.UTC), + ttl: time.Hour * 2, + expired: []models.ID{sub1ID, sub2ID}, + }, { + name: "all expired", + timeRef: time.Date(2024, time.December, 15, 15, 0, 0, 0, time.UTC), + ttl: time.Hour * 24 * 30, + expired: []models.ID{sub1ID, sub2ID, sub3ID}, + }} + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + threshold := testCase.timeRef.Add(-testCase.ttl) + expired, err := r.ListExpiredSubscriptions(ctx, threshold) + require.NoError(t, err) + + expiredIDs := make([]models.ID, 0, len(expired)) + for _, expiredSub := range expired { + expiredIDs = append(expiredIDs, expiredSub.ID) + } + require.ElementsMatch(t, expiredIDs, testCase.expired) + }) + } +}