Skip to content

Commit

Permalink
Merge pull request #22 from castaneai/fix/deindex-tickets-with-ttl
Browse files Browse the repository at this point in the history
Tickets deleted by TTL should also be deleted from the ticket index.
  • Loading branch information
castaneai authored Feb 2, 2024
2 parents f2efdd8 + 79e315d commit e531cbc
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 5 deletions.
22 changes: 21 additions & 1 deletion pkg/statestore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package statestore
import (
"context"
"fmt"
"log"
"strconv"
"strings"
"time"

"github.com/redis/rueidis"
Expand Down Expand Up @@ -155,6 +157,9 @@ func (s *RedisStore) GetAssignment(ctx context.Context, ticketID string) (*pb.As
return s.getAssignment(ctx, redis, ticketID)
}

// The ActiveTicketIDs may still contain the ID of a ticket that was deleted by TTL.
// This is because the ticket index and Ticket data are stored in separate keys.
// The following `GetTicket` or `GetTickets` call will resolve this inconsistency.
func (s *RedisStore) GetActiveTicketIDs(ctx context.Context, limit int64) ([]string, error) {
// Acquire a lock to prevent multiple backends from fetching the same Ticket.
// In order to avoid race conditions with other Ticket Index changes, get tickets and set them to pending state should be done atomically.
Expand Down Expand Up @@ -276,6 +281,8 @@ func (s *RedisStore) getTicket(ctx context.Context, ticketID string) (*pb.Ticket
resp := s.client.Do(ctx, s.client.B().Get().Key(redisKeyTicketData(s.opts.keyPrefix, ticketID)).Build())
if err := resp.Error(); err != nil {
if rueidis.IsRedisNil(err) {
// If the ticket has been deleted by TTL, it is deleted from the ticket index as well.
_ = s.deIndexTickets(ctx, []string{ticketID})
return nil, ErrTicketNotFound
}
return nil, fmt.Errorf("failed to get ticket: %w", err)
Expand Down Expand Up @@ -341,9 +348,12 @@ func (s *RedisStore) getTickets(ctx context.Context, ticketIDs []string) ([]*pb.
}

tickets := make([]*pb.Ticket, 0, len(keys))
for _, resp := range mgetMap {
var ticketIDsDeleted []string
for key, resp := range mgetMap {
if err := resp.Error(); err != nil {
if rueidis.IsRedisNil(err) {
// If the ticket has been deleted by TTL, it is deleted from the ticket index as well.
ticketIDsDeleted = append(ticketIDsDeleted, ticketIDFromRedisKey(s.opts.keyPrefix, key))
continue
}
return nil, fmt.Errorf("failed to get tickets: %w", err)
Expand All @@ -358,6 +368,12 @@ func (s *RedisStore) getTickets(ctx context.Context, ticketIDs []string) ([]*pb.
}
tickets = append(tickets, ticket)
}
log.Printf("to be de-indexed: %v", ticketIDsDeleted)
if len(ticketIDsDeleted) > 0 {
if err := s.deIndexTickets(ctx, ticketIDsDeleted); err != nil {
return nil, fmt.Errorf("failed to de-index tickets which have been deleted: %w", err)
}
}
return tickets, nil
}

Expand Down Expand Up @@ -469,6 +485,10 @@ func redisKeyAssignmentData(prefix, ticketID string) string {
return fmt.Sprintf("%sassign:%s", prefix, ticketID)
}

func ticketIDFromRedisKey(prefix, key string) string {
return strings.TrimPrefix(key, prefix)
}

// difference returns the elements in `a` that aren't in `b`.
// https://stackoverflow.com/a/45428032
func difference(a, b []string) []string {
Expand Down
59 changes: 55 additions & 4 deletions pkg/statestore/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,61 @@ func TestTicketTTL(t *testing.T) {
store := newTestRedisStore(t, mr.Addr(), WithTicketTTL(ticketTTL))
ctx := context.Background()

mustCreateTicket := func(id string) {
require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: id}))
ticket, err := store.GetTicket(ctx, id)
require.NoError(t, err)
require.Equal(t, id, ticket.Id)
}

_, err := store.GetTicket(ctx, "test1")
require.Error(t, err, ErrTicketNotFound)

require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test1"}))
t1, err := store.GetTicket(ctx, "test1")
require.NoError(t, err)
require.Equal(t, "test1", t1.Id)
mustCreateTicket("test1")

// "test1" has been deleted by TTL
mr.FastForward(ticketTTL + 1*time.Second)

_, err = store.GetTicket(ctx, "test1")
require.Error(t, err, ErrTicketNotFound)

activeTicketIDs, err := store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit)
require.NoError(t, err)
require.NotContains(t, activeTicketIDs, "test1")

mustCreateTicket("test2")
mustCreateTicket("test3")

ts, err := store.GetTickets(ctx, []string{"test2", "test3"})
require.NoError(t, err)
require.ElementsMatch(t, ticketIDs(ts), []string{"test2", "test3"})

// "test2" and "test3" have been deleted by TTL
mr.FastForward(ticketTTL + 1*time.Second)

// "test4" remains as it has not passed TTL
mustCreateTicket("test4")

// The ActiveTicketIDs may still contain the ID of a ticket that was deleted by TTL.
// This is because the ticket index and Ticket data are stored in separate keys.

// In this example, "test2" and "test3" were deleted by TTL, but remain in the ticket index.
activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit)
require.NoError(t, err)
require.ElementsMatch(t, activeTicketIDs, []string{"test2", "test3", "test4"})
err = store.ReleaseTickets(ctx, []string{"test2", "test3", "test4"})
require.NoError(t, err)

// `GetTickets` call will resolve inconsistency.
ts, err = store.getTickets(ctx, []string{"test2", "test3", "test4"})
require.NoError(t, err)
require.ElementsMatch(t, ticketIDs(ts), []string{"test4"})

// Because we called GetTickets, "test2" and "test3" which were deleted by TTL,
// were deleted from the ticket index as well.
activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit)
require.NoError(t, err)
require.ElementsMatch(t, activeTicketIDs, []string{"test4"})
}

func TestConcurrentFetchActiveTickets(t *testing.T) {
Expand Down Expand Up @@ -257,3 +300,11 @@ func chunkBy[T any](items []T, chunkSize int) (chunks [][]T) {
}
return append(chunks, items)
}

func ticketIDs(tickets []*pb.Ticket) []string {
ids := make([]string, 0, len(tickets))
for _, ticket := range tickets {
ids = append(ids, ticket.Id)
}
return ids
}

0 comments on commit e531cbc

Please sign in to comment.