Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client stats #63

Closed
wants to merge 10 commits into from
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/bradfitz/gomemcache

go 1.12
75 changes: 68 additions & 7 deletions memcache/memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"

"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)

// Similar to:
// http://code.google.com/appengine/docs/go/memcache/reference.html
// https://godoc.org/google.golang.org/appengine/memcache

var (
// ErrCacheMiss means that a Get failed because the item wasn't present.
Expand Down Expand Up @@ -113,6 +112,7 @@ var (
resultTouched = []byte("TOUCHED\r\n")

resultClientErrorPrefix = []byte("CLIENT_ERROR ")
versionPrefix = []byte("VERSION")
)

// New returns a memcache client using the provided server(s)
Expand All @@ -129,6 +129,12 @@ func NewFromSelector(ss ServerSelector) *Client {
return &Client{selector: ss}
}

// Stats contains statistic about connections being used by client.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

statistics (plural)

type Stats struct {
ActiveConns int
IdleConns int
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document these two also.

}

// Client is a memcache client.
// It is safe for unlocked use by multiple concurrent goroutines.
type Client struct {
Expand All @@ -144,9 +150,12 @@ type Client struct {
// be set to a number higher than your peak parallel requests.
MaxIdleConns int

// number of currently used connections
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docs that this must be accessed atomically

activeConns int32

selector ServerSelector

lk sync.Mutex
lk sync.RWMutex
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't seem worth it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe more straightforward to just make idleConns an atomic int as well and inc/dec when freeconn changes?

freeconn map[string][]*conn
}

Expand Down Expand Up @@ -193,6 +202,7 @@ func (cn *conn) extendDeadline() {
// cache miss). The purpose is to not recycle TCP connections that
// are bad.
func (cn *conn) condRelease(err *error) {
atomic.AddInt32(&cn.c.activeConns, -1)
if *err == nil || resumableError(*err) {
cn.release()
} else {
Expand Down Expand Up @@ -276,6 +286,7 @@ func (c *Client) getConn(addr net.Addr) (*conn, error) {
cn, ok := c.getFreeConn(addr)
if ok {
cn.extendDeadline()
atomic.AddInt32(&c.activeConns, 1)
return cn, nil
}
nc, err := c.dial(addr)
Expand All @@ -289,6 +300,7 @@ func (c *Client) getConn(addr net.Addr) (*conn, error) {
c: c,
}
cn.extendDeadline()
atomic.AddInt32(&c.activeConns, 1)
return cn, nil
}

Expand Down Expand Up @@ -326,8 +338,9 @@ func (c *Client) Get(key string) (item *Item, err error) {

// Touch updates the expiry for the given key. The seconds parameter is either
// a Unix timestamp or, if seconds is less than 1 month, the number of seconds
// into the future at which time the item will expire. ErrCacheMiss is returned if the
// key is not in the cache. The key must be at most 250 bytes in length.
// into the future at which time the item will expire. Zero means the item has
// no expiration time. ErrCacheMiss is returned if the key is not in the cache.
// The key must be at most 250 bytes in length.
func (c *Client) Touch(key string, seconds int32) (err error) {
return c.withKeyAddr(key, func(addr net.Addr) error {
return c.touchFromAddr(addr, []string{key}, seconds)
Expand Down Expand Up @@ -398,6 +411,30 @@ func (c *Client) flushAllFromAddr(addr net.Addr) error {
})
}

// ping sends the version command to the given addr
func (c *Client) ping(addr net.Addr) error {
return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error {
if _, err := fmt.Fprintf(rw, "version\r\n"); err != nil {
return err
}
if err := rw.Flush(); err != nil {
return err
}
line, err := rw.ReadSlice('\n')
if err != nil {
return err
}

switch {
case bytes.HasPrefix(line, versionPrefix):
break
default:
return fmt.Errorf("memcache: unexpected response line from ping: %q", string(line))
}
return nil
})
}

func (c *Client) touchFromAddr(addr net.Addr, keys []string, expiration int32) error {
return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error {
for _, key := range keys {
Expand Down Expand Up @@ -465,6 +502,21 @@ func (c *Client) GetMulti(keys []string) (map[string]*Item, error) {
return m, err
}

// Stats returns current statistic
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the current statistics. (with trailing period)

func (c *Client) Stats() Stats {
c.lk.RLock()
idleConns := 0
for _, conns := range c.freeconn {
idleConns += len(conns)
}
c.lk.RUnlock()

return Stats{
ActiveConns: int(atomic.LoadInt32(&c.activeConns)),
IdleConns: idleConns,
}
}

// parseGetResponse reads a GET response from r and calls cb for each
// read and allocated Item
func parseGetResponse(r *bufio.Reader, cb func(*Item)) error {
Expand All @@ -481,11 +533,14 @@ func parseGetResponse(r *bufio.Reader, cb func(*Item)) error {
if err != nil {
return err
}
it.Value, err = ioutil.ReadAll(io.LimitReader(r, int64(size)+2))
it.Value = make([]byte, size+2)
_, err = io.ReadFull(r, it.Value)
if err != nil {
it.Value = nil
return err
}
if !bytes.HasSuffix(it.Value, crlf) {
it.Value = nil
return fmt.Errorf("memcache: corrupt get result read")
}
it.Value = it.Value[:size]
Expand Down Expand Up @@ -641,6 +696,12 @@ func (c *Client) DeleteAll() error {
})
}

// Ping checks all instances if they are alive. Returns error if any
// of them is down.
func (c *Client) Ping() error {
return c.selector.Each(c.ping)
}

// Increment atomically increments key by delta. The return value is
// the new value after being incremented or an error. If the value
// didn't exist in memcached the error is ErrCacheMiss. The value in
Expand Down
3 changes: 3 additions & 0 deletions memcache/memcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ func testWithClient(t *testing.T, c *Client) {
t.Errorf("post-DeleteAll want ErrCacheMiss, got %v", err)
}

// Test Ping
err = c.Ping()
checkErr(err, "error ping: %s", err)
}

func testTouchWithClient(t *testing.T, c *Client) {
Expand Down