Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XINFO GROUPS can't report that Lag is invalid #3185

Open
sancar opened this issue Nov 7, 2024 · 1 comment
Open

XINFO GROUPS can't report that Lag is invalid #3185

sancar opened this issue Nov 7, 2024 · 1 comment

Comments

@sancar
Copy link

sancar commented Nov 7, 2024

XINFO GROUPS returns XInfoGroup.Lag field as 0 when redis reports nil for the Lag.
The redis nil error seems to be ignored on purpose here.
https://github.com/redis/go-redis/blob/master/command.go#L2151

This makes differentiating that actually lag is zero or it can not be computed.

Expected Behavior

I would expect to see a differentiable value from 0 when it is invalid.

Current Behavior

It returns 0 when redis reports nil for the Lag.

Possible Solution

We can return -1 instead of 0 when redis reports nil for the Lag.

Steps to Reproduce

Here you can find a description when redis reports nil for the lag.

One or more entries between the group's last-delivered-id and the stream's last-generated-id were deleted

https://redis.io/docs/latest/commands/xinfo-groups/

A code example to reproduce:

        r, err := redis.ParseURL("redis://:@127.0.0.1:6379")
	panicOnError(err)
	client := redis.NewClient(r)

	ctx := context.Background()
	client.FlushDB(ctx)

	client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-1", Values: []string{"foo", "1"}})
	client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-2", Values: []string{"foo", "2"}})
	client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-3", Values: []string{"foo", "3"}})

	err = client.XGroupCreate(ctx, "s", "g", "0").Err()
	panicOnError(err)
	err = client.XReadGroup(ctx, &redis.XReadGroupArgs{Group: "g", Consumer: "c", Streams: []string{"s", ">"}, Count: 1, Block: -1, NoAck: false}).Err()
	panicOnError(err)

	client.XDel(ctx, "s", "0-2")

	result, err := client.XInfoGroups(context.Background(), "s").Result()
	panicOnError(err)
	for _, group := range result {
		fmt.Printf("%+v\n", group)
	}

The output it prints:
{Name:g Consumers:1 Pending:1 LastDeliveredID:0-1 EntriesRead:1 Lag:0}

What I expected to see:
{Name:g Consumers:1 Pending:1 LastDeliveredID:0-1 EntriesRead:1 Lag:-1}

Possible Implementation

The code here can be changed to

group.Lag, err = rd.ReadInt()

// lag: the number of entries in the stream that are still waiting to be delivered
// to the group's consumers, or a NULL(Nil) when that number can't be determined.
if err != nil {
   err == Nil { 
       group.Lag = -1       
   } else {
       return err
   }   
}			
@sancar
Copy link
Author

sancar commented Nov 7, 2024

If anyone needs a workaround, I have implemented the following function to use until this issue is fixed:

// CustomXInfoGroup is a custom implementation of XINFO GROUPS that can report -1 for lag when the lag can not be
// computed on the Redis side(redis returns nil for this case).
// go-redis reports 0 when redis returns nil which makes it impossible to report lag correctly.
// This code can be removed once the related issue is fixed in go-redis https://github.com/redis/go-redis/issues/3185
func CustomXInfoGroup(ctx context.Context, client *redis.Client, stream string) ([]redis.XInfoGroup, error) {
	res, err := client.Do(ctx, "XINFO", "GROUPS", stream).Result()
	if err != nil {
		return nil, err
	}
	groups, valid := res.([]any)
	if !valid {
		return nil, errors.New("unexpected type from XINFO GROUPS")
	}
	infoGroups := make([]redis.XInfoGroup, len(groups))
	for i, group := range groups {
		infoGroup := &infoGroups[i]

		switch typeCastedGroup := group.(type) {
		case []any:
			nn := len(typeCastedGroup) / 2
			var key string
			for j := 0; j < nn; j++ {
				key, valid = typeCastedGroup[j*2].(string)
				if !valid {
					return nil, errors.New("redis: unexpected type from XINFO GROUPS for GROUP key field")
				}
				value := typeCastedGroup[(j*2)+1]
				err = fillInfoGroup(infoGroup, key, value)
				if err != nil {
					return nil, err
				}
			}
		case map[any]any:
			for key, value := range typeCastedGroup {
				err = fillInfoGroup(infoGroup, key, value)
				if err != nil {
					return nil, err
				}
			}
		default:
			return nil, errors.New("unexpected type from XINFO GROUPS for GROUP")
		}
	}

	return infoGroups, nil
}

func fillInfoGroup(infoGroup *redis.XInfoGroup, key any, value any) error {
	var valid bool
	switch key {
	case "name":
		infoGroup.Name, valid = value.(string)
		if !valid {
			return errors.New("redis: unexpected type from XINFO GROUPS for GROUP Name field")
		}
	case "consumers":
		infoGroup.Consumers, valid = value.(int64)
		if !valid {
			return errors.New("redis: unexpected type from XINFO GROUPS for GROUP Consumers field")
		}
	case "pending":
		infoGroup.Pending, valid = value.(int64)
		if !valid {
			return errors.New("redis: unexpected type from XINFO GROUPS for GROUP Pending field")
		}
	case "last-delivered-id":
		infoGroup.LastDeliveredID, valid = value.(string)
		if !valid {
			return errors.New("redis: unexpected type from XINFO GROUPS for GROUP LastDeliveredID field")
		}
	case "entries-read":
		infoGroup.EntriesRead, valid = value.(int64)
		if !valid {
			return errors.New("redis: unexpected type from XINFO GROUPS for GROUP EntriesRead field")
		}
	case "lag":
		// lag: the number of entries in the stream that are still waiting to be delivered
		// to the group's consumers, or a NULL(Nil) when that number can't be determined.
		if value == nil {
			infoGroup.Lag = -1
		} else {
			infoGroup.Lag, valid = value.(int64)
			if !valid {
				return errors.New("redis: unexpected type from XINFO GROUPS for GROUP Lag field")
			}
		}
	default:
		return fmt.Errorf("redis: unexpected key %q in XINFO GROUPS reply", key)
	}
	return nil
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant