gomemcache源码阅读
今天想学习一下代码是如何调用memcache的,就阅读了一下gomemcache(memcached的一个驱动库)的源码,原理还是很简单的,主要就是通过net/dial.go的方法来实现连接。
使用方法
import ( "github.com/bradfitz/gomemcache/memcache" ) func main() { mc := memcache.New("10.0.0.1:11211", "10.0.0.2:11211", "10.0.0.3:11212") mc.Set(&memcache.Item{Key: "foo", Value: []byte("my value")}) it, err := mc.Get("foo") ... }
使用方法很简单,调用New生成一个client实例,就可以调用Set/Get等方法进行数据库的操作。下面将介绍gomemcache的源码。
New
New时的输入参数是目标服务器的地址,可以输入多个,在有请求进入的时候会通过crc计算,分配到各个服务器,如果同一个地址输入多次,就会提高访问的权重。
包含有地址的slice的结构体,作为selector填入client结构体中,该interface包含选择指定服务器和遍历所有服务器执行函数两种方法。
type ServerList struct { mu sync.RWMutex addrs []net.Addr } type ServerSelector interface { PickServer(key string) (net.Addr, error) Each(func(net.Addr) error) error } type Client struct { Timeout time.Duration MaxIdleConns int selector ServerSelector lk sync.Mutex freeconn map[string][]*conn } func New(server ...string) *Client { ss := new(ServerList) ss.SetServers(server...) return NewFromSelector(ss) }
Set
Set方法会将数据库操作函数指针传递给onItem方法,onItem会先进行资源分配,再操作数据库。
可以看到onItem函数先进行了服务器的选择,获取连接,执行函数指针,再释放连接的操作。
func (c *Client) Set(item *Item) error { return c.onItem(item, (*Client).set) } func (c *Client) onItem(item *Item, fn func(*Client, *bufio.ReadWriter, *Item) error) error { addr, err := c.selector.PickServer(item.Key) if err != nil { return err } cn, err := c.getConn(addr) if err != nil { return err } defer cn.condRelease(&err) if err = fn(c, cn.rw, item); err != nil { return err } return nil }
PickServer函数,传入key键值,从pool中取出一块byte slice,拷贝key值至slice,计算crc,将crc值和服务器地址数量进行取余,值即为几号服务器,这种方法可以保证相同key的请求会进入相同的服务器。
func (ss *ServerList) PickServer(key string) (net.Addr, error) { ss.mu.RLock() defer ss.mu.RUnlock() if len(ss.addrs) == 0 { return nil, ErrNoServers } if len(ss.addrs) == 1 { return ss.addrs[0], nil } bufp := keyBufPool.Get().(*[]byte) n := copy(*bufp, key) cs := crc32.ChecksumIEEE((*bufp)[:n]) keyBufPool.Put(bufp) return ss.addrs[cs%uint32(len(ss.addrs))], nil }
getConn首先尝试从getFreeConn中获取连接,如果没有,则手动dial,并设置超时时间。
type conn struct { nc net.Conn rw *bufio.ReadWriter addr net.Addr c *Client } func (c *Client) getConn(addr net.Addr) (*conn, error) { cn, ok := c.getFreeConn(addr) if ok { cn.extendDeadline() return cn, nil } nc, err := c.dial(addr) if err != nil { return nil, err } cn = &conn{ nc: nc, addr: addr, rw: bufio.NewReadWriter(bufio.NewReader(nc), bufio.NewWriter(nc)), c: c, } cn.extendDeadline() return cn, nil }
getFreeConn,通过addr作为key值的一个map,每个addr都有一个conn结构体的slice。最初slice中没有连接,通过后续的连接动态添加。取连接时pop出slice的最后一项。
type Client struct{ freeconn map[string][]*conn } func (c *Client) getFreeConn(addr net.Addr) (cn *conn, ok bool) { c.lk.Lock() defer c.lk.Unlock() if c.freeconn == nil { return nil, false } freelist, ok := c.freeconn[addr.String()] if !ok || len(freelist) == 0 { return nil, false } cn = freelist[len(freelist)-1] c.freeconn[addr.String()] = freelist[:len(freelist)-1] return cn, true }
extendDeadline,函数设置dial连接的超时时间。如果没有设置client的超时时间,则取默认值100*time.Millisecond。
func (cn *conn) extendDeadline() { cn.nc.SetDeadline(time.Now().Add(cn.c.netTimeout())) } func (c *Client) netTimeout() time.Duration { if c.Timeout != 0 { return c.Timeout } return DefaultTimeout }
condRelease释放连接,和getConn对应,输入为数据库操作函数的返回值,如果返回值无错误或正常问题,则调用release,将连接放到freeconn池中。否则释放连接。
func (cn *conn) condRelease(err *error) { if *err == nil || resumableError(*err) { cn.release() } else { cn.nc.Close() } } func (cn *conn) release() { cn.c.putFreeConn(cn.addr, cn) } func (c *Client) putFreeConn(addr net.Addr, cn *conn) { c.lk.Lock() defer c.lk.Unlock() if c.freeconn == nil { c.freeconn = make(map[string][]*conn) } freelist := c.freeconn[addr.String()] // 如果连接池连接数量大于最大闲置数量,则释放 if len(freelist) >= c.maxIdleConns() { cn.nc.Close() return } c.freeconn[addr.String()] = append(freelist, cn) } func resumableError(err error) bool { switch err { case ErrCacheMiss, ErrCASConflict, ErrNotStored, ErrMalformedKey: return true } return false }
上面是获取、释放资源的外围操作,实际进行数据库操作的set如下。
Item结构体是Set请求的数据参数,包含key,value,操作参数,生存时间,casid。
var ( crlf = []byte("\r\n") ) type Item struct { Key string Value []byte Flags uint32 Expiration int32 casid uint64 } func (c *Client) set(rw *bufio.ReadWriter, item *Item) error { return c.populateOne(rw, "set", item) } func (c *Client) populateOne(rw *bufio.ReadWriter, verb string, item *Item) error { if !legalKey(item.Key) { return ErrMalformedKey } var err error if verb == "cas" { _, err = fmt.Fprintf(rw, "%s %s %d %d %d %d\r\n", verb, item.Key, item.Flags, item.Expiration, len(item.Value), item.casid) } else { _, err = fmt.Fprintf(rw, "%s %s %d %d %d\r\n", verb, item.Key, item.Flags, item.Expiration, len(item.Value)) } if err != nil { return err } if _, err = rw.Write(item.Value); err != nil { return err } if _, err := rw.Write(crlf); err != nil { return err } if err := rw.Flush(); err != nil { return err } line, err := rw.ReadSlice('\n') if err != nil { return err } switch { case bytes.Equal(line, resultStored): return nil case bytes.Equal(line, resultNotStored): return ErrNotStored case bytes.Equal(line, resultExists): return ErrCASConflict case bytes.Equal(line, resultNotFound): return ErrCacheMiss } return fmt.Errorf("memcache: unexpected response line from %q: %q", verb, string(line)) }
这里其实很简单,将数据写到连接中,写crlf,刷新缓冲区。读返回值,将返回值与标准值进行比较,获取返回结果。如此,一个完整的数据库操作就完成了。
其他的请求比如Get、Add等也是类似的操作,通过将标准memcache指令通过dial发送给服务器,接收到标准memcache返回,解析出真实数据。
其中还比较有趣的函数就是Ping。
Ping
使用该函数会对所有的服务器发送一条"version"命令,如果返回值除了版本前缀,还包含其他值,则ping成功。
func (c *Client) Ping() error { return c.selector.Each(c.ping) } func (c *Client) ping(addr net.Addr) error { return c.withAddrRw(addr, func(rw *bufio.ReadWriter) error { if _, err := fmt.Fprintf(rw, "version\r\n"); err != nil { return err } if err := rw.Flush(); err != nil { return err } line, err := rw.ReadSlice('\n') if err != nil { return err } switch { case bytes.HasPrefix(line, versionPrefix): break default: return fmt.Errorf("memcache: unexpected response line from ping: %q", string(line)) } return nil }) }
其中的锁使用
对于服务器的地址slice的操作使用了读写锁,读取时加读锁,写入时加写锁。
func (ss *ServerList) Each(f func(net.Addr) error) error { ss.mu.RLock() defer ss.mu.RUnlock() for _, a := range ss.addrs { if err := f(a); nil != err { return err } } return nil }
对于连接池则使用了互斥锁。
那么问题来了,能不能都使用RW锁,但是只用写锁的功能呢,答案是可以的,但是没有必要。在源码中,RWMutex的结构体中包含了Mutex结构体,说明读写锁是在互斥锁上层封装了功能,所以效率必不如互斥锁。
- 点赞
- 收藏
- 分享
- 文章举报
- nsq源码阅读 nsqd源码一 apps/nsqd/nsqd.go
- Memcache-Java-Client-Release源码阅读(之三)
- xUtils源码阅读(4)-MemCacheKey
- Memcache源码阅读(1)---看源码的心得
- Memcache源码阅读(6)---数据存储
- Go net/http 主要功能及部分源码阅读
- uboot源码阅读(八)江湖人物go
- Go语言Http Server源码阅读
- kubernetes源码之watch包filter.go阅读理解三
- kubernetes源码之watch包streamwatcher.go阅读理解五
- nsq源码阅读 nsqlookupd源码三 tcp.go tcp_server.go
- Memcache源码阅读(8)---多线程
- Memcache-Java-Client-Release源码阅读(之七)
- Memcache源码阅读(3)---处理用户输入
- go源码阅读笔记(math.1)
- Go缓存库cache2go源码阅读
- go-micro源码阅读笔记1
- nsq源码阅读 nsqlookupd源码四 lookup_protocol_v1.go
- Eclipse源码阅读:GotoTypeAction
- go-tour源码阅读