Replies: 2 comments
-
@sakoush I found your issue #1935 in .NET repo You are describing the same problem which I try to solve. Did you manage to find a solution other than sleeping for some time? |
Beta Was this translation helpful? Give feedback.
-
Hi, it may not be relevant to GO or .NET but I finally got it working in Java client by manually seeking to the right offset. Note that in my case the problem was with auto-offset-reset mechanism which I couldn't get to run synchronously. Java client provides several convenience methods like |
Beta Was this translation helpful? Give feedback.
-
I have a use case where I want to implement synchronous request / response on top of kafka. For example when the user sends an HTTP request, I want to produce a message on a specific kafka input topic that triggers a dataflow eventually resulting in a response produced on an output topic. I want then to consume the message from the output topic and return the response to the caller.
The workflow is:
HTTP Request -> produce message on input topic -> (consume message from input topic -> app logic -> produce message on output topic) -> consume message from output topic -> HTTP Response.
To implement this case, upon receiving the first HTTP request I want to be able to create on the fly a consumer that will consume from the output topic, before producing a message on the input topic. Otherwise there is a possibility that messages on the output topic are "lost". Consumers in my case have a random
group.id
and haveauto.offset.reset
=latest
for application reasons.My question is how I can make sure that the consumer is ready before producing messages. I make sure that I call
SubscribeTopics
before producing messages. but in my tests so far when there are no committed offsets and kafka is resetting offsets to latest, there is a possibility that messages are lost and never read by my consumer because kafka sometimes thinks that the consumer registered after the messages have been produced.My workaround so far is to sleep for a bit after I create the consumer to allow kafka to proceed with the commit reset workflow before I produce messages.
I have also tried to implement logic in a rebalance call back (triggered by a consumer subscribing to a topic), in which I am calling
assign
with offset = latest for the topic partition, but this doesn't seem to have fixed my issue.Hopefully there is a better solution out there than sleep.
Beta Was this translation helpful? Give feedback.
All reactions