Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BatchReceivePolicy.maxNumMessages not working #259

Open
Myron5787 opened this issue Apr 16, 2024 · 8 comments
Open

BatchReceivePolicy.maxNumMessages not working #259

Myron5787 opened this issue Apr 16, 2024 · 8 comments

Comments

@Myron5787
Copy link

Dear Author,

I am currently utilizing the BatchReceiveAsync method in my project, and I've encountered an issue where the number of messages received exceeds the limit set by BatchReceivePolicy.maxNumMessages. Despite configuring the BatchReceivePolicy to limit the number of messages per batch, the method occasionally returns more messages than specified.

await using var consumer =
    await pulsarClient.NewConsumer(Schema.STRING(Encoding.UTF8))
        .Topic(_topic)
        .SubscriptionName(_subscription)
        .SubscriptionType(SubscriptionType.Shared)
        .SubscriptionInitialPosition(SubscriptionInitialPosition.Latest)
        .AutoUpdatePartitions(true)
        .NegativeAckRedeliveryDelay(TimeSpan.FromMilliseconds(500))
        .BatchReceivePolicy(new BatchReceivePolicy(100, -1L, TimeSpan.FromSeconds(10)))
        .SubscribeAsync();

var messages = await consumer.BatchReceiveAsync(CancellationToken);
var len = messages.Count; // len = ReceiverQueueSize(default: 1000)
@Lanayx
Copy link
Member

Lanayx commented Apr 16, 2024

Hi! BatchReceiveAsync is actually covered by a number of integration tests and I have never seen it failing. Can you please create an additional failing Integration test in PR so it would be possible to identify an issue and fix it.

@Myron5787
Copy link
Author

I am unable to push the branch due to insufficient permissions.

@Lanayx
Copy link
Member

Lanayx commented Apr 17, 2024

It's probably your first PR :) You fork the repository, do you commit there and then create a PR from your repository to this one

@Myron5787
Copy link
Author

Thank you for your explanation.
I have created the PR. Since I primarily work on C# projects, I would like to ask how the integration tests are generally conducted for this project.

@Lanayx
Copy link
Member

Lanayx commented Apr 17, 2024

#260 - I think you updated the right test, however the test didn't fail

@Lanayx
Copy link
Member

Lanayx commented Apr 17, 2024

If you know some specific failing conditions like more threads, more messages or anything else, I'd recommend to create a separate test rather than updating existing one

@Myron5787
Copy link
Author

Currently, I am not aware of any additional triggering conditions, and I am not proficient in using F#. If I find any further triggering conditions later, I will report back.

It has been observed that in the .Net 8.0 C# environment, the data length returned by BatchReceiveAsync is equal to ReceiveQueueSize, rather than the maxNumMessages defined in BatchReceivePolicy.

Pulsar-broker image: apachepulsar/pulsar-all:2.10.2

@Lanayx
Copy link
Member

Lanayx commented Apr 17, 2024

I see, unfortunately without a failing test there is nothing to fix. I think it shouldn't be difficult to write additional test by copying existing one and modifying it. If you have any F#-related questions you can ask in slack or discord or telegram, we have a friendly community

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants