您的位置:首页 > 数据库 > Memcache

gomemcache源码阅读

2020-01-14 16:13 1456 查看

今天想学习一下代码是如何调用memcache的,就阅读了一下gomemcache(memcached的一个驱动库)的源码,原理还是很简单的,主要就是通过net/dial.go的方法来实现连接。

使用方法

import (
"github.com/bradfitz/gomemcache/memcache"
)

func main() {
mc := memcache.New("10.0.0.1:11211", "10.0.0.2:11211", "10.0.0.3:11212")
mc.Set(&memcache.Item{Key: "foo", Value: []byte("my value")})

it, err := mc.Get("foo")
...
}

使用方法很简单,调用New生成一个client实例,就可以调用Set/Get等方法进行数据库的操作。下面将介绍gomemcache的源码。

New

New时的输入参数是目标服务器的地址,可以输入多个,在有请求进入的时候会通过crc计算,分配到各个服务器,如果同一个地址输入多次,就会提高访问的权重。
包含有地址的slice的结构体,作为selector填入client结构体中,该interface包含选择指定服务器和遍历所有服务器执行函数两种方法。

type ServerList struct {
mu    sync.RWMutex
addrs []net.Addr
}
type ServerSelector interface {
PickServer(key string) (net.Addr, error)
Each(func(net.Addr) error) error
}
type Client struct {
Timeout time.Duration
MaxIdleConns int
selector ServerSelector
lk       sync.Mutex
freeconn map[string][]*conn
}
func New(server ...string) *Client {
ss := new(ServerList)
ss.SetServers(server...)
return NewFromSelector(ss)
}

Set

Set方法会将数据库操作函数指针传递给onItem方法,onItem会先进行资源分配,再操作数据库。
可以看到onItem函数先进行了服务器的选择,获取连接,执行函数指针,再释放连接的操作。

func (c *Client) Set(item *Item) error {
return c.onItem(item, (*Client).set)
}

func (c *Client) onItem(item *Item, fn func(*Client, *bufio.ReadWriter, *Item) error) error {
addr, err := c.selector.PickServer(item.Key)
if err != nil {
return err
}
cn, err := c.getConn(addr)
if err != nil {
return err
}
defer cn.condRelease(&err)
if err = fn(c, cn.rw, item); err != nil {
return err
}
return nil
}

PickServer函数,传入key键值,从pool中取出一块byte slice,拷贝key值至slice,计算crc,将crc值和服务器地址数量进行取余,值即为几号服务器,这种方法可以保证相同key的请求会进入相同的服务器。

func (ss *ServerList) PickServer(key string) (net.Addr, error) {
ss.mu.RLock()
defer ss.mu.RUnlock()
if len(ss.addrs) == 0 {
return nil, ErrNoServers
}
if len(ss.addrs) == 1 {
return ss.addrs[0], nil
}
bufp := keyBufPool.Get().(*[]byte)
n := copy(*bufp, key)
cs := crc32.ChecksumIEEE((*bufp)[:n])
keyBufPool.Put(bufp)

return ss.addrs[cs%uint32(len(ss.addrs))], nil
}

getConn首先尝试从getFreeConn中获取连接,如果没有,则手动dial,并设置超时时间。

type conn struct {
nc   net.Conn
rw   *bufio.ReadWriter
addr net.Addr
c    *Client
}
func (c *Client) getConn(addr net.Addr) (*conn, error) {
cn, ok := c.getFreeConn(addr)
if ok {
cn.extendDeadline()
return cn, nil
}
nc, err := c.dial(addr)
if err != nil {
return nil, err
}
cn = &conn{
nc:   nc,
addr: addr,
rw:   bufio.NewReadWriter(bufio.NewReader(nc), bufio.NewWriter(nc)),
c:    c,
}
cn.extendDeadline()
return cn, nil
}

getFreeConn,通过addr作为key值的一个map,每个addr都有一个conn结构体的slice。最初slice中没有连接,通过后续的连接动态添加。取连接时pop出slice的最后一项。

type Client struct{
freeconn map[string][]*conn
}
func (c *Client) getFreeConn(addr net.Addr) (cn *conn, ok bool) {
c.lk.Lock()
defer c.lk.Unlock()
if c.freeconn == nil {
return nil, false
}
freelist, ok := c.freeconn[addr.String()]
if !ok || len(freelist) == 0 {
return nil, false
}
cn = freelist[len(freelist)-1]
c.freeconn[addr.String()] = freelist[:len(freelist)-1]
return cn, true
}

extendDeadline,函数设置dial连接的超时时间。如果没有设置client的超时时间,则取默认值100*time.Millisecond。

func (cn *conn) extendDeadline() {
cn.nc.SetDeadline(time.Now().Add(cn.c.netTimeout()))
}
func (c *Client) netTimeout() time.Duration {
if c.Timeout != 0 {
return c.Timeout
}
return DefaultTimeout
}

condRelease释放连接,和getConn对应,输入为数据库操作函数的返回值,如果返回值无错误或正常问题,则调用release,将连接放到freeconn池中。否则释放连接。

func (cn *conn) condRelease(err *error) {
if *err == nil || resumableError(*err) {
cn.release()
} else {
cn.nc.Close()
}
}

func (cn *conn) release() {
cn.c.putFreeConn(cn.addr, cn)
}

func (c *Client) putFreeConn(addr net.Addr, cn *conn) {
c.lk.Lock()
defer c.lk.Unlock()
if c.freeconn == nil {
c.freeconn = make(map[string][]*conn)
}
freelist := c.freeconn[addr.String()]
// 如果连接池连接数量大于最大闲置数量,则释放
if len(freelist) >= c.maxIdleConns() {
cn.nc.Close()
return
}
c.freeconn[addr.String()] = append(freelist, cn)
}

func resumableError(err error) bool {
switch err {
case ErrCacheMiss, ErrCASConflict, ErrNotStored, ErrMalformedKey:
return true
}
return false
}

上面是获取、释放资源的外围操作,实际进行数据库操作的set如下。
Item结构体是Set请求的数据参数,包含key,value,操作参数,生存时间,casid。

var (
crlf            = []byte("\r\n")
)
type Item struct {
Key string
Value []byte
Flags uint32
Expiration int32
casid uint64
}
func (c *Client) set(rw *bufio.ReadWriter, item *Item) error {
return c.populateOne(rw, "set", item)
}
func (c *Client) populateOne(rw *bufio.ReadWriter, verb string, item *Item) error {
if !legalKey(item.Key) {
return ErrMalformedKey
}
var err error
if verb == "cas" {
_, err = fmt.Fprintf(rw, "%s %s %d %d %d %d\r\n",
verb, item.Key, item.Flags, item.Expiration, len(item.Value), item.casid)
} else {
_, err = fmt.Fprintf(rw, "%s %s %d %d %d\r\n",
verb, item.Key, item.Flags, item.Expiration, len(item.Value))
}
if err != nil {
return err
}
if _, err = rw.Write(item.Value); err != nil {
return err
}
if _, err := rw.Write(crlf); err != nil {
return err
}
if err := rw.Flush(); err != nil {
return err
}
line, err := rw.ReadSlice('\n')
if err != nil {
return err
}
switch {
case bytes.Equal(line, resultStored):
return nil
case bytes.Equal(line, resultNotStored):
return ErrNotStored
case bytes.Equal(line, resultExists):
return ErrCASConflict
case bytes.Equal(line, resultNotFound):
return ErrCacheMiss
}
return fmt.Errorf("memcache: unexpected response line from %q: %q", verb, string(line))
}

这里其实很简单,将数据写到连接中,写crlf,刷新缓冲区。读返回值,将返回值与标准值进行比较,获取返回结果。如此,一个完整的数据库操作就完成了。

其他的请求比如Get、Add等也是类似的操作,通过将标准memcache指令通过dial发送给服务器,接收到标准memcache返回,解析出真实数据。

其中还比较有趣的函数就是Ping。

Ping

使用该函数会对所有的服务器发送一条"version"命令,如果返回值除了版本前缀,还包含其他值,则ping成功。

func (c *Client) Ping() error {
return c.selector.Each(c.ping)
}
func (c *Client) ping(addr net.Addr) error {
return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error {
if _, err := fmt.Fprintf(rw, "version\r\n"); err != nil {
return err
}
if err := rw.Flush(); err != nil {
return err
}
line, err := rw.ReadSlice('\n')
if err != nil {
return err
}

switch {
case bytes.HasPrefix(line, versionPrefix):
break
default:
return fmt.Errorf("memcache: unexpected response line from ping: %q", string(line))
}
return nil
})
}

其中的锁使用

对于服务器的地址slice的操作使用了读写锁,读取时加读锁,写入时加写锁。

func (ss *ServerList) Each(f func(net.Addr) error) error {
ss.mu.RLock()
defer ss.mu.RUnlock()
for _, a := range ss.addrs {
if err := f(a); nil != err {
return err
}
}
return nil
}

对于连接池则使用了互斥锁。

那么问题来了,能不能都使用RW锁,但是只用写锁的功能呢,答案是可以的,但是没有必要。在源码中,RWMutex的结构体中包含了Mutex结构体,说明读写锁是在互斥锁上层封装了功能,所以效率必不如互斥锁。

  • 点赞
  • 收藏
  • 分享
  • 文章举报
gaorx1019 发布了13 篇原创文章 · 获赞 0 · 访问量 246 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: