Skip to content

Commit

Permalink
Merge pull request #1 from ankitkinra/discovery
Browse files Browse the repository at this point in the history
Add autodiscovery to master branch
  • Loading branch information
ankitkinra authored Mar 26, 2020
2 parents a41fca8 + 5c69222 commit 9428199
Show file tree
Hide file tree
Showing 9 changed files with 903 additions and 8 deletions.
34 changes: 28 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,24 @@
This is a memcache client library for the Go programming language
(http://golang.org/).

It is a fork of https://github.com/bradfitz/gomemcache/memcache,
with support for Autodiscovery client based on https://cloud.google.com/memorystore/docs/memcached/auto-discovery-overview


## Installing

### Using *go get*

$ go get github.com/bradfitz/gomemcache/memcache
$ go get github.com/ankitkinra/gomemcache/memcache

After this command *gomemcache* is ready to use. Its source will be in:

$GOPATH/src/github.com/bradfitz/gomemcache/memcache
$GOPATH/src/github.com/ankitkinra/gomemcache/memcache

## Example
## Example without Autodiscovery

import (
"github.com/bradfitz/gomemcache/memcache"
"github.com/ankitkinra/gomemcache/memcache"
)

func main() {
Expand All @@ -27,11 +31,29 @@ After this command *gomemcache* is ready to use. Its source will be in:
...
}

## Example with Autodiscovery

import (
"github.com/ankitkinra/gomemcache/memcache"
)

func main() {
mcDiscovery := memcache.NewDiscoveryClient("10.0.0.1:11211", 30 * time.Second)
mcDiscovery.Set(&memcache.Item{Key: "foo", Value: []byte("my value")})

it, err := mc.Get("foo")
...
mcDiscovery.Stop()
}

### Note
Remember to call Stop() on the discovery enabled client to stop the polling, else this can leak "go" methods.

## Full docs, see:

See https://godoc.org/github.com/bradfitz/gomemcache/memcache
See https://godoc.org/github.com/ankitkinra/gomemcache/memcache

Or run:

$ godoc github.com/bradfitz/gomemcache/memcache
$ godoc github.com/ankitkinra/gomemcache/memcache

95 changes: 95 additions & 0 deletions memcache/cluster_config_parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
Copyright 2020 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package memcache

import (
"bufio"
"fmt"
"strconv"
"strings"
)

var (
configKeyword = "CONFIG"
)

// ClusterNode represents address of a memcached node.
type ClusterNode struct {
Host string
Port int64
}

// ClusterConfig represents cluster configuration which contains nodes and version.
type ClusterConfig struct {
// ConfigId is the monotonically increasing identifier for the config information
ConfigID int64

// NodeAddresses are array of ClusterNode which contain address of a memcache node.
NodeAddresses []ClusterNode
}

// parseConfigGetResponse reads a CONFIG GET response from r and calls cb for each
// read and allocates ClusterConfig
func parseConfigGetResponse(r *bufio.Reader, cb func(*ClusterConfig)) error {
scanner := bufio.NewScanner(r)
clusterConfig := new(ClusterConfig)
// TODO-GO: Replace below document with Feature on github describing the change.
// Response from config service is here:
// https://docs.google.com/document/d/15V9tKuffWrcCVwDZRmRBOV1SDcuo6P8u05dddwZYxCo/edit
for scanner.Scan() {
line := scanner.Text()
line = strings.TrimSpace(line)
// Skip empty line
if line == "" {
continue
}
// CONFIG keyword line is expected as follows:
// CONFIG cluster 0 <count-of-bytes-in-next-two-lines>\r\n
if strings.Contains(line, configKeyword) {
// After config keyword we expect next line to contain config id in the form
// <config-id>\n
scanner.Scan()
configIDLine := strings.TrimSpace(scanner.Text())
configID, parseError := strconv.ParseInt(configIDLine, 10, 64)
if parseError != nil {
return parseError
}
clusterConfig.ConfigID = configID

// Read the third line of the response which contains host
// hostname1|ip-address1|port1<space>hostname2|ip-address2|port2<space>\n\r\n
scanner.Scan()
nodeHostPortAdds := strings.TrimSpace(scanner.Text())
// tokenize on space and then pipe
nodes := strings.Split(nodeHostPortAdds, " ")
for _, node := range nodes {
nodeHostPort := strings.Split(node, "|")
if len(nodeHostPort) < 3 {
return fmt.Errorf("host address (%s) not in expected format", nodeHostPort)
}
nodePort, intParseError := strconv.ParseInt(nodeHostPort[2], 10, 64)
if intParseError != nil {
return intParseError
}
clusterConfig.NodeAddresses = append(clusterConfig.NodeAddresses, ClusterNode{Host: nodeHostPort[1], Port: nodePort})
}
}

}
cb(clusterConfig)
return nil
}
110 changes: 110 additions & 0 deletions memcache/cluster_config_parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
Copyright 2020 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package memcache provides a client for the memcached cache server.
package memcache

import (
"bufio"
"fmt"
"reflect"
"strconv"
"strings"
"testing"
)

func prepareConfigResponse(discoveryID int, discoveryAddress [][]string) string {
var temp strings.Builder
temp.WriteString("")
temp.WriteString("CONFIG cluster 0 80\r\n")
temp.WriteString(fmt.Sprintf("%d", discoveryID))
temp.WriteString("\r\n")
for i, address := range discoveryAddress {
temp.WriteString(fmt.Sprintf("%s|%s|%s", address[0], address[0], address[1]))
if i < len(discoveryAddress)-1 {
temp.WriteString(" ")
}
}
temp.WriteString("\n\r\n")
return temp.String()
}

func buildClusterConfig(discoveryID int, discoveryAddress [][]string) *ClusterConfig {
cc := &ClusterConfig{ConfigID: int64(discoveryID)}
cc.NodeAddresses = make([]ClusterNode, len(discoveryAddress))
for i, address := range discoveryAddress {
port, _ := strconv.ParseInt(address[1], 10, 64)
cc.NodeAddresses[i] = ClusterNode{Host: address[0], Port: port}
}
return cc
}

func TestGoodClusterConfigs(t *testing.T) {
configTests := []struct {
discoveryID int
discoveryAddresses [][]string
}{
{2, [][]string{[]string{"localhost", "112233"}}},
{1000, [][]string{[]string{"localhost", "112233"}, []string{"127.0.0.4", "123435"}}},
{50, [][]string{[]string{"localhost", "112233"}, []string{"127.0.0.4", "123435"}, []string{"127.0", "123"}}},
}
for _, tt := range configTests {
config := prepareConfigResponse(tt.discoveryID, tt.discoveryAddresses)
want := buildClusterConfig(tt.discoveryID, tt.discoveryAddresses)
reader := bufio.NewReader(strings.NewReader(config))
got := &ClusterConfig{}
f := func(cb *ClusterConfig) {
got = cb
}
if err := parseConfigGetResponse(reader, f); err != nil {
t.Errorf("parseConfigGetResponse(%q) had parse err:%v", config, err)
}
if !reflect.DeepEqual(*got, *want) {
t.Errorf("configResponse(%q) = %v; want = %v", config, got, want)
}
}

}

func TestBrokenClusterConfigs(t *testing.T) {
emptyConfig := ClusterConfig{}
configTests := []struct {
configResponse string
wantErrorText string
wantConfig ClusterConfig
}{
{"", "", emptyConfig}, // empty config returns no error with empty cluster config
{"END\r\n", "", emptyConfig}, // empty config returns no error with empty cluster config
{"CONFIG cluster 0 80\r\nbadCfg\r\n123.76|123.76|5432\r\nEND\r\n", "strconv.ParseInt: parsing \"badCfg\"", emptyConfig}, // error parsing port
{"CONFIG cluster 0 80\r\n100\r\n123.76|123.76|portBroken\r\nEND\r\n", "strconv.ParseInt: parsing \"portBroken\"", emptyConfig}, // error parsing port
{"CONFIG cluster 0 80\r\n100\r\n123.76123.76portBroken\r\nEND\r\n", "host address ([123.76123.76portBroken]) not in expected format", emptyConfig}, // error tokenizing due to no pipes
{"CONFIG cluster 0 80\r\n100\r\n123.76|123.76|123123.76|123.76|123\r\nEND\r\n", "invalid syntax", emptyConfig}, // error tokenizing due to no spaces

}
for _, tt := range configTests {
reader := bufio.NewReader(strings.NewReader(tt.configResponse))
got := &ClusterConfig{}
f := func(cb *ClusterConfig) {
got = cb
}
if gotError := parseConfigGetResponse(reader, f); gotError != nil && !strings.Contains(gotError.Error(), tt.wantErrorText) {
t.Errorf("parseConfigGetResponse(%q) parse error mismatch, got:%v, wantText:%v", tt.configResponse, gotError, tt.wantErrorText)
}
if !reflect.DeepEqual(*got, tt.wantConfig) {
t.Errorf("parseConfigGetResponse(%q), gotConfig:%v was not equal to wantConfig: %v", tt.configResponse, got, tt.wantConfig)
}
}
}
129 changes: 129 additions & 0 deletions memcache/config_poller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
Copyright 2020 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package memcache

import (
"log"
"net"
"strconv"
"sync"
"time"
)

const clusterConfigName = "cluster"

// configPoller is config service poller.
// It is not safe for use by multiple concurrent goroutines.
type configPoller struct {
// pollingFrequency specified how often poller polls.
pollingFrequency time.Duration

tick *time.Ticker
done chan bool
once sync.Once

// reference to selector which will used to update the servers for the main client
serverList *ServerList

mc *Client

clusterConfigMU sync.RWMutex
prevClusterConfig *ClusterConfig
}

// creates a new cluster config poller
func newConfigPoller(frequency time.Duration, servers *ServerList, mc *Client) *configPoller {

poller := &configPoller{
pollingFrequency: frequency,
serverList: servers,
mc: mc,
tick: time.NewTicker(frequency),
done: make(chan bool),
}
// Hold the thread to initialize before returning.
err := poller.readConfigAndUpdateServerList()
if err != nil {
// no action required unless stop is explicitly called
log.Printf("Warning: First poll for discovery service failed due to %v", err)
}
go poller.readConfigPeriodically()
return poller
}

func (c *configPoller) readConfigPeriodically() {
for {
select {
case <-c.tick.C:
err := c.readConfigAndUpdateServerList()
if err != nil {
// no action required unless stop is explicitly called
log.Printf("Warning: Periodic poll for discovery service failed due to %v", err)
}
case <-c.done:
return
}
}
}

// Stop the internal polling.
func (c *configPoller) stopPolling() {
c.once.Do(func() {
close(c.done)
})
}

func (c *configPoller) readConfigAndUpdateServerList() error {
clusterConfig, err := c.mc.GetConfig(clusterConfigName)
if err != nil {
// nothing to do in this round.
return err
}
// compare existing config information with new config information
updateClusterConf := false
c.clusterConfigMU.RLock()
if c.prevClusterConfig != nil {
if clusterConfig.ConfigID > c.prevClusterConfig.ConfigID {
updateClusterConf = true
}
} else {
updateClusterConf = true
}
c.clusterConfigMU.RUnlock()

if updateClusterConf {
c.updateServerList(clusterConfig)
}
return nil
}

// updateServerList is not thread safe and should not be called without holding lock on clusterConfigMU
func (c *configPoller) updateServerList(cc *ClusterConfig) error {
s := getServerAddresses(cc)
c.serverList.SetServers(s...)
c.prevClusterConfig = cc
return nil
}

func getServerAddresses(cc *ClusterConfig) []string {
servers := make([]string, 0, len(cc.NodeAddresses))
for _, n := range cc.NodeAddresses {
// Validation happens when main memcache client tries to connect to this address
servers = append(servers, net.JoinHostPort(n.Host, strconv.FormatInt(n.Port, 10)))
}
return servers
}
Loading

0 comments on commit 9428199

Please sign in to comment.