Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom starting offset #54

Open
sajal opened this issue Feb 16, 2015 · 5 comments
Open

Custom starting offset #54

sajal opened this issue Feb 16, 2015 · 5 comments
Assignees

Comments

@sajal
Copy link

sajal commented Feb 16, 2015

Hi all,

Currently i do some event stream style processing using homebrew Go code passing messages using zmq. There is no resiliency/failover, and there is no partitioning of messages as provided by kafka.

I am looking into using Kafka + go_kafka_client and have some specific questions.

On startup (and when re-partition happens) I want to start at an offset that is x mins ago (approx - does not heve to be exact, I can deal with few extra messages) . It appears sarama can do this. How would I go about using the work distribution goodness of go_kafka_client and put in some custom starting offset logic? Ignoring the fact that ive already consumed a specific message, or that there are unconsumed messages from > x minutes ago.

-Sajal

@joestein
Copy link
Contributor

joestein commented Mar 2, 2015

We could probably start storing an offset for every minute or something so you can re-wind at any minute in the stream. We are working on some refactoring in that part of the code over the coming weeks. I think so maybe we can try to hook something up that would work best.

@sajal
Copy link
Author

sajal commented Mar 2, 2015

For my use case, I could manage the offset myself if there were a way to specify which starting offset to use when starting up (or re-balancing).

A logic like x offsets ago is also fine. Say when starting up or (rebalancing) the newest offset is x... it could process from offset x - n where n is something configured.

@baconalot
Copy link

+1 a common usecase I have is priming a job with some historic data, but not the complete available dataset.

@baconalot
Copy link

Only now do I see the problems here. With a single consumer pid it would work fine if we had something in config like ForceSetParitionStart = 1234.
Flow would then be:
-pid 333 (consumer.go) start
-set offsets to 1234
-start consuming

But... with multiple pids (chronos anyone):
-pid 333 (consumer.go) start
-(333)set offsets to 1234
-(333)start consuming
-pid 334 (consumer.go) start
-(334)set offsets to 1234
-(334)start consuming
-(333) -> reprocess messages

Also how would this be configured. Cant be single int, since there can be any topic/partitions in the conf's group.

For now I am just going to use a helper that can set a commit manually for me:

    zkconfig := go_kafka_client.NewZookeeperConfig()
    zkcoord := go_kafka_client.NewZookeeperCoordinator(zkconfig)

    err := zkcoord.Connect()
    if err != nil {
        fmt.Errorf(err.Error())
    }

    tp := go_kafka_client.TopicAndPartition{}
    tp.Topic = "some_topic_name"
    tp.Partition = 213
    err = zkcoord.CommitOffset("some_group_id", &tp, 123)
    if err != nil {
        fmt.Errorf(err.Error())
    }

@sajal
Copy link
Author

sajal commented Apr 15, 2015

Isint Offset tied to a partition and not at topic level? or do you mean to process single partition in multiple pids?

As long as each partition gets processed by single pid(my usecase), is there any issues in using x - n as starting offset?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants