You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
partition, modify records it consumed, and produce back to a new topic. Streams
So I am manually managing transactions. I am using NewGroupTransactSession() and then calling c.kafka.Begin(). It returns the error below.
2024/12/30 16:19:17 INFO immediate metadata update triggered consumerId=0 why="querying metadata for consumer initialization"
2024/12/30 16:19:18 INFO beginning to manage the group lifecycle consumerId=0 group=bulk-indexer
2024/12/30 16:19:18 INFO joining group consumerId=0 group=bulk-indexer
2024/12/30 16:19:18 INFO join returned MemberIDRequired, rejoining with response's MemberID consumerId=0 group=bulk-indexer member_id=kgo-456a2c14b405412c9e985726d3466449
2024/12/30 16:19:18 INFO joined, balancing group consumerId=0 group=bulk-indexer member_id=kgo-456a2c14b405412c9e985726d3466449 instance_id=<nil> generation=1 balance_protocol=cooperative-sticky leader=true
2024/12/30 16:19:18 INFO balancing group as leader consumerId=0
2024/12/30 16:19:18 INFO balance group member consumerId=0 id=kgo-456a2c14b405412c9e985726d3466449 interests="interested topics: [radicl.bulk.indexer], previously owned: "
2024/12/30 16:19:18 INFO balanced consumerId=0 plan="kgo-456a2c14b405412c9e985726d3466449{radicl.bulk.indexer[0 1]}"
2024/12/30 16:19:18 INFO syncing consumerId=0 group=bulk-indexer protocol_type=consumer protocol=cooperative-sticky
2024/12/30 16:19:18 INFO synced consumerId=0 group=bulk-indexer assigned="radicl.bulk.indexer[0 1]"
2024/12/30 16:19:18 INFO new group session begun consumerId=0 group=bulk-indexer added="radicl.bulk.indexer[0 1]" lost=""
2024/12/30 16:19:18 INFO beginning heartbeat loop consumerId=0 group=bulk-indexer
2024/12/30 16:19:18 INFO assigning partitions consumerId=0 why="newly fetched offsets for group bulk-indexer" how="assigning everything new, keeping current assignment" input="radicl.bulk.indexer[1{-2 e-1 ce0} 0{-2 e-1 ce0}]"
2024/12/30 16:19:18 INFO beginning transact session consumerId=0
2024/12/30 16:19:18 INFO initializing producer id consumerId=0
2024/12/30 16:19:18 INFO producer id initialization errored consumerId=0 err="UNKNOWN_SERVER_ERROR: The server experienced an unexpected error when processing the request."
2024/12/30 16:19:18 INFO unable to begin transaction due to unrecoverable producer id error consumerId=0 err="UNKNOWN_SERVER_ERROR: The server experienced an unexpected error when processing the request."
2024/12/30 16:19:18 ERROR Unable to start new session: consumerId=0 error="producer ID has a fatal, unrecoverable error, err: UNKNOWN_SERVER_ERROR: The server experienced an unexpected error when processing the request."
consumer_test.go:62:
Error Trace: /Users/benaldrich/code/platform/foundation/bulk-indexer/indexer/consumer_test.go:62
Error: Expected nil, but got: &fmt.wrapError{msg:"producer ID has a fatal, unrecoverable error, err: UNKNOWN_SERVER_ERROR: The server experienced an unexpected error when processing the request.", err:(*kerr.Error)(0x1036ac5a0)}
Test: TestPollWithCancel
The producerId in question is a simple string with the name of the service. I have also attempted using a uuid and I get the same error. Does the kfake broker not support manual transactions?
If I avoid creating a transaction and call poll without the transaction it work correctly. The transactions work when using a real broker.
The text was updated successfully, but these errors were encountered:
Not supported -- I last tried working on it in Sept 2023. If you want to help, the branch is https://github.com/twmb/franz-go/tree/kfake_txns. I'd like help, otherwise this is very low on the priority list (maybe this year???). It's a bit challenging 😅
I am attempting to write tests around my consumer, to avoid dropping messages I am following
franz-go/docs/transactions.md
Line 37 in 293b7c4
So I am manually managing transactions. I am using NewGroupTransactSession() and then calling c.kafka.Begin(). It returns the error below.
The producerId in question is a simple string with the name of the service. I have also attempted using a uuid and I get the same error. Does the kfake broker not support manual transactions?
If I avoid creating a transaction and call poll without the transaction it work correctly. The transactions work when using a real broker.
The text was updated successfully, but these errors were encountered: