From b18b8d0edb1d9318f4e2734c9c11674379ed1e11 Mon Sep 17 00:00:00 2001 From: Mike Kabischev Date: Tue, 7 Mar 2017 00:48:03 +0300 Subject: [PATCH 1/7] stats for client --- memcache/memcache.go | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/memcache/memcache.go b/memcache/memcache.go index b98a7653..f76cbda8 100644 --- a/memcache/memcache.go +++ b/memcache/memcache.go @@ -25,10 +25,10 @@ import ( "io" "io/ioutil" "net" - "strconv" "strings" "sync" + "sync/atomic" "time" ) @@ -129,6 +129,12 @@ func NewFromSelector(ss ServerSelector) *Client { return &Client{selector: ss} } +// Stats contains statistic about connections being used by client. +type Stats struct { + ActiveConns int + IdleConns int +} + // Client is a memcache client. // It is safe for unlocked use by multiple concurrent goroutines. type Client struct { @@ -144,9 +150,12 @@ type Client struct { // be set to a number higher than your peak parallel requests. MaxIdleConns int + // number of currently used connections + activeConns int32 + selector ServerSelector - lk sync.Mutex + lk sync.RWMutex freeconn map[string][]*conn } @@ -193,6 +202,7 @@ func (cn *conn) extendDeadline() { // cache miss). The purpose is to not recycle TCP connections that // are bad. func (cn *conn) condRelease(err *error) { + atomic.AddInt32(&cn.c.activeConns, -1) if *err == nil || resumableError(*err) { cn.release() } else { @@ -276,6 +286,7 @@ func (c *Client) getConn(addr net.Addr) (*conn, error) { cn, ok := c.getFreeConn(addr) if ok { cn.extendDeadline() + atomic.AddInt32(&c.activeConns, 1) return cn, nil } nc, err := c.dial(addr) @@ -289,6 +300,7 @@ func (c *Client) getConn(addr net.Addr) (*conn, error) { c: c, } cn.extendDeadline() + atomic.AddInt32(&c.activeConns, 1) return cn, nil } @@ -465,6 +477,21 @@ func (c *Client) GetMulti(keys []string) (map[string]*Item, error) { return m, err } +// Stats returns current statistic +func (c *Client) Stats() Stats { + c.lk.RLock() + idleConns := 0 + for _, conns := range c.freeconn { + idleConns += len(conns) + } + c.lk.RUnlock() + + return Stats{ + ActiveConns: int(atomic.LoadInt32(&c.activeConns)), + IdleConns: idleConns, + } +} + // parseGetResponse reads a GET response from r and calls cb for each // read and allocated Item func parseGetResponse(r *bufio.Reader, cb func(*Item)) error { From e206b6e27577e762454f0c890728652cdc17dcb7 Mon Sep 17 00:00:00 2001 From: Mostyn Bramley-Moore Date: Mon, 28 May 2018 19:02:07 +0200 Subject: [PATCH 2/7] Replace code.google.com URL with godoc.org URL code.google.com is in archive/maintenance mode, and redirects the original URL to: https://cloud.google.com/appengine/docs/standard/go/memcache/reference Let's just use a godoc URL instead. --- memcache/memcache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memcache/memcache.go b/memcache/memcache.go index b98a7653..1cb30fd7 100644 --- a/memcache/memcache.go +++ b/memcache/memcache.go @@ -33,7 +33,7 @@ import ( ) // Similar to: -// http://code.google.com/appengine/docs/go/memcache/reference.html +// https://godoc.org/google.golang.org/appengine/memcache var ( // ErrCacheMiss means that a Get failed because the item wasn't present. From f867c99a8ddb43e1703e8da9cb015b4c09699336 Mon Sep 17 00:00:00 2001 From: Andrew Rodland Date: Tue, 10 Jul 2018 11:55:40 -0400 Subject: [PATCH 3/7] Use ReadFull instead of ioutil.ReadAll to read objects (#76) * Use ReadFull instead of ioutil.ReadAll to read objects ioutil.ReadAll uses a bytes.Buffer, which allocates memory by doubling. Since we know exactly how muh data we expect to get, we can allocate it in advance. This reduces the total amount of allocation, and ensures that the slice stored in the item won't have excess capacity (which can affect memory usage if the item is held for a long time, for instance in a secondary cache). --- memcache/memcache.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/memcache/memcache.go b/memcache/memcache.go index 1cb30fd7..ef679f8c 100644 --- a/memcache/memcache.go +++ b/memcache/memcache.go @@ -23,7 +23,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net" "strconv" @@ -481,11 +480,14 @@ func parseGetResponse(r *bufio.Reader, cb func(*Item)) error { if err != nil { return err } - it.Value, err = ioutil.ReadAll(io.LimitReader(r, int64(size)+2)) + it.Value = make([]byte, size+2) + _, err = io.ReadFull(r, it.Value) if err != nil { + it.Value = nil return err } if !bytes.HasSuffix(it.Value, crlf) { + it.Value = nil return fmt.Errorf("memcache: corrupt get result read") } it.Value = it.Value[:size] From bc664df9673713a0ccf26e3b55a673ec7301088b Mon Sep 17 00:00:00 2001 From: mostynb Date: Tue, 10 Jul 2018 17:56:16 +0200 Subject: [PATCH 4/7] Describe the meaning of 0 for Touch()'s seconds parameter (#82) This is mentioned in the Expiration field of the Item struct (no expiration time), but not in the notes about the Touch function. --- memcache/memcache.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/memcache/memcache.go b/memcache/memcache.go index ef679f8c..25e88ca2 100644 --- a/memcache/memcache.go +++ b/memcache/memcache.go @@ -325,8 +325,9 @@ func (c *Client) Get(key string) (item *Item, err error) { // Touch updates the expiry for the given key. The seconds parameter is either // a Unix timestamp or, if seconds is less than 1 month, the number of seconds -// into the future at which time the item will expire. ErrCacheMiss is returned if the -// key is not in the cache. The key must be at most 250 bytes in length. +// into the future at which time the item will expire. Zero means the item has +// no expiration time. ErrCacheMiss is returned if the key is not in the cache. +// The key must be at most 250 bytes in length. func (c *Client) Touch(key string, seconds int32) (err error) { return c.withKeyAddr(key, func(addr net.Addr) error { return c.touchFromAddr(addr, []string{key}, seconds) From 551aad21a6682b95329c1f5bd62ee5060d64f7e8 Mon Sep 17 00:00:00 2001 From: Colin Arnott Date: Fri, 29 Mar 2019 17:39:43 +0000 Subject: [PATCH 5/7] add mod file (#94) - [go modules](https://github.com/golang/go/wiki/Modules) will be enabled in 1.13. - after merge it would be nice if you would tag this repo: e.g. `v1.0.0` - this will allow consumers something more consistent and readable than `v0.0.0-20181229203832-0af3f3b09a0a`. --- go.mod | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 go.mod diff --git a/go.mod b/go.mod new file mode 100644 index 00000000..0d0eed2c --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/bradfitz/gomemcache + +go 1.12 From a41fca850d0b6f392931a78cbae438803ea0b886 Mon Sep 17 00:00:00 2001 From: Andrei Avram Date: Fri, 13 Sep 2019 20:36:17 +0300 Subject: [PATCH 6/7] Add Ping method. (#91) * Add Ping method. --- memcache/memcache.go | 31 +++++++++++++++++++++++++++++++ memcache/memcache_test.go | 3 +++ 2 files changed, 34 insertions(+) diff --git a/memcache/memcache.go b/memcache/memcache.go index 25e88ca2..545a3e79 100644 --- a/memcache/memcache.go +++ b/memcache/memcache.go @@ -112,6 +112,7 @@ var ( resultTouched = []byte("TOUCHED\r\n") resultClientErrorPrefix = []byte("CLIENT_ERROR ") + versionPrefix = []byte("VERSION") ) // New returns a memcache client using the provided server(s) @@ -398,6 +399,30 @@ func (c *Client) flushAllFromAddr(addr net.Addr) error { }) } +// ping sends the version command to the given addr +func (c *Client) ping(addr net.Addr) error { + return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error { + if _, err := fmt.Fprintf(rw, "version\r\n"); err != nil { + return err + } + if err := rw.Flush(); err != nil { + return err + } + line, err := rw.ReadSlice('\n') + if err != nil { + return err + } + + switch { + case bytes.HasPrefix(line, versionPrefix): + break + default: + return fmt.Errorf("memcache: unexpected response line from ping: %q", string(line)) + } + return nil + }) +} + func (c *Client) touchFromAddr(addr net.Addr, keys []string, expiration int32) error { return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error { for _, key := range keys { @@ -644,6 +669,12 @@ func (c *Client) DeleteAll() error { }) } +// Ping checks all instances if they are alive. Returns error if any +// of them is down. +func (c *Client) Ping() error { + return c.selector.Each(c.ping) +} + // Increment atomically increments key by delta. The return value is // the new value after being incremented or an error. If the value // didn't exist in memcached the error is ErrCacheMiss. The value in diff --git a/memcache/memcache_test.go b/memcache/memcache_test.go index 4b52a911..70d47026 100644 --- a/memcache/memcache_test.go +++ b/memcache/memcache_test.go @@ -209,6 +209,9 @@ func testWithClient(t *testing.T, c *Client) { t.Errorf("post-DeleteAll want ErrCacheMiss, got %v", err) } + // Test Ping + err = c.Ping() + checkErr(err, "error ping: %s", err) } func testTouchWithClient(t *testing.T, c *Client) { From 4fd84d0bc9a28664511eafbd8fb370a393df8a43 Mon Sep 17 00:00:00 2001 From: Mike Kabischev Date: Tue, 7 Mar 2017 00:48:03 +0300 Subject: [PATCH 7/7] stats for client --- memcache/memcache.go | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/memcache/memcache.go b/memcache/memcache.go index 545a3e79..2ad8fc4b 100644 --- a/memcache/memcache.go +++ b/memcache/memcache.go @@ -24,10 +24,10 @@ import ( "fmt" "io" "net" - "strconv" "strings" "sync" + "sync/atomic" "time" ) @@ -129,6 +129,12 @@ func NewFromSelector(ss ServerSelector) *Client { return &Client{selector: ss} } +// Stats contains statistic about connections being used by client. +type Stats struct { + ActiveConns int + IdleConns int +} + // Client is a memcache client. // It is safe for unlocked use by multiple concurrent goroutines. type Client struct { @@ -144,9 +150,12 @@ type Client struct { // be set to a number higher than your peak parallel requests. MaxIdleConns int + // number of currently used connections + activeConns int32 + selector ServerSelector - lk sync.Mutex + lk sync.RWMutex freeconn map[string][]*conn } @@ -193,6 +202,7 @@ func (cn *conn) extendDeadline() { // cache miss). The purpose is to not recycle TCP connections that // are bad. func (cn *conn) condRelease(err *error) { + atomic.AddInt32(&cn.c.activeConns, -1) if *err == nil || resumableError(*err) { cn.release() } else { @@ -276,6 +286,7 @@ func (c *Client) getConn(addr net.Addr) (*conn, error) { cn, ok := c.getFreeConn(addr) if ok { cn.extendDeadline() + atomic.AddInt32(&c.activeConns, 1) return cn, nil } nc, err := c.dial(addr) @@ -289,6 +300,7 @@ func (c *Client) getConn(addr net.Addr) (*conn, error) { c: c, } cn.extendDeadline() + atomic.AddInt32(&c.activeConns, 1) return cn, nil } @@ -490,6 +502,21 @@ func (c *Client) GetMulti(keys []string) (map[string]*Item, error) { return m, err } +// Stats returns current statistic +func (c *Client) Stats() Stats { + c.lk.RLock() + idleConns := 0 + for _, conns := range c.freeconn { + idleConns += len(conns) + } + c.lk.RUnlock() + + return Stats{ + ActiveConns: int(atomic.LoadInt32(&c.activeConns)), + IdleConns: idleConns, + } +} + // parseGetResponse reads a GET response from r and calls cb for each // read and allocated Item func parseGetResponse(r *bufio.Reader, cb func(*Item)) error {