Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebalance breaks when running multiple consumers #98

Open
baconalot opened this issue Apr 23, 2015 · 3 comments
Open

Rebalance breaks when running multiple consumers #98

baconalot opened this issue Apr 23, 2015 · 3 comments

Comments

@baconalot
Copy link

Usecase:
Run N chronos/mesos jobs for a singe consumergroup where N == the number of partitions in the topic. (Lets assume a single topic consumergroup here)

Test:
-Create a go consumer that eats from a large local kafka/topic with 2 partitions.
-Start once (pid 1) -> looks ok, alternates between partition 0 and 1
-Start another (pid 2) -> looks ok, consumes only from one partition
-But pid 1 is then crashed with following stack:

<<happily consuming here>>
Message{Topic: dev_allrequests_3, Partition: 0, Offset: 8002}
Message{Topic: dev_allrequests_3, Partition: 0, Offset: 8003}
Message{Topic: dev_allrequests_3, Partition: 0, Offset: 8004}
Message{Topic: dev_allrequests_3, Partition: 0, Offset: 8005}
Message{Topic: dev_allrequests_3, Partition: 0, Offset: 8006}
Message{Topic: dev_allrequests_3, Partition: 0, Offset: 8007}
2015-04-23/17:22:25 [DEBUG] [zk] Getting info for broker 0
2015-04-23/17:22:25 [DEBUG] [zk] Trying to get partition assignments for topics [dev_allrequests_3]
2015-04-23/17:22:25 [DEBUG] [zk] Getting consumers in group asdasdsdads_cango
2015-04-23/17:22:25 [DEBUG] [zk] Getting consumers in group asdasdsdads_cango
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Releasing partition ownership
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Successfully released partition ownership
2015-04-23/17:22:25 [INFO] [zk] Commencing assertion series at /consumers/asdasdsdads_cango/api/rebalance/e8ee221f572c5a810b7c60818eadeb69
2015-04-23/17:22:25 [INFO] [zk] Joining state barrier /consumers/asdasdsdads_cango/api/rebalance/e8ee221f572c5a810b7c60818eadeb69
2015-04-23/17:22:25 [DEBUG] [zk] Trying to create path /consumers/asdasdsdads_cango/api/rebalance/e8ee221f572c5a810b7c60818eadeb69/ebdffa95-ce32-e383-fddb-eec3d9a2e571 in Zookeeper
2015-04-23/17:22:25 [INFO] [zk] Successfully joined state barrier /consumers/asdasdsdads_cango/api/rebalance/e8ee221f572c5a810b7c60818eadeb69
2015-04-23/17:22:25 [DEBUG] [zk] Trying to assert rebalance state for group asdasdsdads_cango and hash e8ee221f572c5a810b7c60818eadeb69 with 2
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] [%!!(MISSING)s(int32=0) %!!(MISSING)s(int32=1)]
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] {ebdffa95-ce32-e383-fddb-eec3d9a2e571 %!s(int=0)} attempting to claim {Topic: dev_allrequests_3, Partition: 1}
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Consumer is trying to reflect partition ownership decision: map[{dev_allrequests_3 1}:{ebdffa95-ce32-e383-fddb-eec3d9a2e571 0}]

2015-04-23/17:22:25 [DEBUG] [zk] Trying to create path /consumers/asdasdsdads_cango/owners/dev_allrequests_3 in Zookeeper
2015-04-23/17:22:25 [DEBUG] [zk] Successfully claimed partition 1 in topic dev_allrequests_3 for {ebdffa95-ce32-e383-fddb-eec3d9a2e571 %!s(int=0)}
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Consumer successfully claimed partition 1 for topic dev_allrequests_3
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Partition ownership has been successfully reflected
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Trying to reinitialize fetchers and workers
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Trying to update fetcher
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Updating fetcher with numStreams = 1
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Topic Registry = map[dev_allrequests_3:map[%!s(int32=1):{Topic: dev_allrequests_3, Partition: 1, FetchedOffset: -1, Buffer: {Topic: dev_allrequests_3, Partition: 1}-MessageBuffer}]]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Fetcher Manager started
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] TopicInfos = [{Topic: dev_allrequests_3, Partition: 1, FetchedOffset: -1, Buffer: {Topic: dev_allrequests_3, Partition: 1}-MessageBuffer}]
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Received asknext for {Topic: dev_allrequests_3, Partition: 1}
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Partition map: map[{dev_allrequests_3 0}:8008 {dev_allrequests_3 1}:7506]
2015-04-23/17:22:25 [DEBUG] [Sarama client] Adding block: topic=dev_allrequests_3, partition=1, offset=7506, fetchsize=1048576
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Sent partition data to {dev_allrequests_3 %!s(int32=1)}
2015-04-23/17:22:25 [DEBUG] [Sarama client] Processing fetch response
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Received asknext for {Topic: dev_allrequests_3, Partition: 0}
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Partition map: map[{dev_allrequests_3 0}:8008 {dev_allrequests_3 1}:7506]
2015-04-23/17:22:25 [DEBUG] [Sarama client] Adding block: topic=dev_allrequests_3, partition=0, offset=8008, fetchsize=1048576
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Sent partition data to {dev_allrequests_3 %!s(int32=0)}
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Updating fetcher configuration
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Got new list of partitions to process map[{dev_allrequests_3 1}:{Topic: dev_allrequests_3, Partition: 1, FetchedOffset: 0, Buffer: {Topic: dev_allrequests_3, Partition: 1}-MessageBuffer}]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] All partitions map: map[]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] There are obsolete partitions [{dev_allrequests_3 0}]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Fetcher ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0 parition map before obsolete partitions removal%!(EXTRA map[go_kafka_client.TopicAndPartition]int64=map[{dev_allrequests_3 0}:8008 {dev_allrequests_3 1}:7506])
2015-04-23/17:22:25 [DEBUG] [ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0] Remove partitions
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Stopping worker manager for {dev_allrequests_3 %!s(int32=0)}
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Trying to stop workerManager
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Stopping manager
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Stopping processor
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Successful manager stop
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Stopping committer
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Successful committer stop
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Stopped failure counter
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Leaving manager stop
2015-04-23/17:22:25 [DEBUG] [WM-dev_allrequests_3-0] Stopped workerManager
2015-04-23/17:22:25 [DEBUG] [zk] Trying to update path /consumers/asdasdsdads_cango/offsets/dev_allrequests_3/0
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Stopping buffer: {Topic: dev_allrequests_3, Partition: 0}-MessageBuffer
2015-04-23/17:22:25 [INFO] [{Topic: dev_allrequests_3, Partition: 0}-MessageBuffer] Trying to stop message buffer
2015-04-23/17:22:25 [INFO] [{Topic: dev_allrequests_3, Partition: 0}-MessageBuffer] Stopping message buffer
2015-04-23/17:22:25 [INFO] [{Topic: dev_allrequests_3, Partition: 0}-MessageBuffer] Stopped message buffer
2015-04-23/17:22:25 [DEBUG] [Sarama client] Processing fetch response
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Fetcher ConsumerFetcherRoutine-ebdffa95-ce32-e383-fddb-eec3d9a2e571-0 parition map after obsolete partitions removal%!(EXTRA map[go_kafka_client.TopicAndPartition]int64=map[{dev_allrequests_3 1}:7506])
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Shutting down idle fetchers
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Closed idle fetchers
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Adding fetcher for partitions map[]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] partitionsPerFetcher: map[]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Applied new partition map map[{dev_allrequests_3 1}:{Topic: dev_allrequests_3, Partition: 1, FetchedOffset: 0, Buffer: {Topic: dev_allrequests_3, Partition: 1}-MessageBuffer}]
2015-04-23/17:22:25 [DEBUG] [ebdffa95-ce32-e383-fddb-eec3d9a2e571-manager] Notifying all waiters about completed update
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Updated fetcher
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Fetcher has been updated &{ebdffa95-ce32-e383-fddb-eec3d9a2e571 asdasdsdads_cango map[dev_allrequests_3:[{ebdffa95-ce32-e383-fddb-eec3d9a2e571 %!s(int=0)}]] %!s(*go_kafka_client.StaticTopicsToNumStreams=&{ebdffa95-ce32-e383-fddb-eec3d9a2e571 map[dev_allrequests_3:1]}) map[dev_allrequests_3:[%!s(int32=0) %!s(int32=1)]] map[dev_allrequests_3:[{934b2c74-7e81-8d6b-52c4-fd034c9a273e %!s(int=0)} {ebdffa95-ce32-e383-fddb-eec3d9a2e571 %!s(int=0)}]] [934b2c74-7e81-8d6b-52c4-fd034c9a273e ebdffa95-ce32-e383-fddb-eec3d9a2e571] [{Version: 1, Id: 0, Host: localhost, Port: 9092}] [all topics here]}
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Initializing worker managers from topic registry: map[dev_allrequests_3:map[%!s(int32=1):{Topic: dev_allrequests_3, Partition: 1, FetchedOffset: -1, Buffer: {Topic: dev_allrequests_3, Partition: 1}-MessageBuffer}]]
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Restarted streams
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Fetchers and workers have been successfully reinitialized
2015-04-23/17:22:25 [INFO] [ebdffa95-ce32-e383-fddb-eec3d9a2e571] Rebalance has been successfully completed
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x4b4388]

goroutine 62144 [running]:
github.com/stealthly/go_kafka_client.func·015()
    /home/me/Development/dev/go/src/github.com/stealthly/go_kafka_client/fetcher.go:343 +0x618
github.com/stealthly/go_kafka_client.inReadLock(0xc208011048, 0xc2083bcf80)
    /home/me/Development/dev/go/src/github.com/stealthly/go_kafka_client/utils.go:52 +0x54
github.com/stealthly/go_kafka_client.(*consumerFetcherRoutine).processPartitionData(0xc2086f9450, 0xc2086ec840, 0x11, 0x0, 0xc20c616000, 0x1d6, 0x200)
    /home/me/Development/dev/go/src/github.com/stealthly/go_kafka_client/fetcher.go:348 +0x141
created by github.com/stealthly/go_kafka_client.func·013
    /home/me/Development/dev/go/src/github.com/stealthly/go_kafka_client/fetcher.go:268 +0xb82

<<more here>>

Fix:
I am not sure if this has some unwanted side effects but I was able to fix this in go_kafka_client.fetcher.go:

func (f *consumerFetcherRoutine) processPartitionData(topicAndPartition TopicAndPartition, messages []*Message) {
    Trace(f, "Trying to acquire lock for partition processing")
    inReadLock(&f.manager.updateLock, func() {
        for f.manager.updateInProgress {
            f.manager.updatedCond.Wait()
        }
        Tracef(f, "Processing partition data for %s", topicAndPartition)
        if len(messages) > 0 {
+           if f.allPartitionMap[topicAndPartition] == nil{
+               return
+           }
            f.partitionMap[topicAndPartition] = messages[len(messages)-1].Offset + 1
            f.allPartitionMap[topicAndPartition].Buffer.addBatch(messages)
            Debugf(f, "Sent partition data to %s", topicAndPartition)
        } else {
            Trace(f, "Got empty message. Ignoring...")
        }
    })
}
@serejja
Copy link
Contributor

serejja commented Apr 24, 2015

Hey @baconalot please checkout the latest master, we got rid of using the allPartitionMap (which is the topic-partition map used by fetcher manager) in fetcherRoutines and this problem should vanish now. Let us know though if doesn't work as expected for you.

Thanks!

@baconalot
Copy link
Author

Hi @serejja that helps with the crash, which does not occur anymore. But.. now either one of the pids get all the partitions or nothing. Is is arbitrary if the latest started one gets the partitions or they remain with the first started, but they never balance like: pid1 -> part1 + part2 [enter pid2] pid1 -> part2 & pid2 -> part1.

@serejja
Copy link
Contributor

serejja commented Sep 17, 2015

Hi @baconalot,

sorry for getting this abandoned. Does this still occur? Lots of changes were made since then including fixing lots of rebalance issues.

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants