您的位置:首页 > 编程语言 > Go语言

Go net/PRC源码阅读client.go

2017-07-02 16:14 645 查看

Client端

本篇文章主要是在go net/rpc 的client.go包进行翻译,并添加注释之后会对client以及server进行总结,废话不多说 直接贴代码了。有不正确的地方还请多多指正。

package rpc

import (
"bufio"
"encoding/gob"
"errors"
"io"
"log"
"net"
"net/http"
"sync"
)

// ServerError represents an error that has been returned from
// the remote side of the RPC connection.
// 返回一个远程连接的错误
type ServerError string

func (e ServerError) Error() string {
return string(e)
}

var ErrShutdown = errors.New("connection is shut down")

// Call represents an active RPC.
type Call struct {
ServiceMethod string      // The name of the service and method to call.要调用的服务的名称和方法
Args          interface{} // The argument to the function (*struct).函数的参数
Reply         interface{} // The reply from the function (*struct). 函数的返回
Error         error       // After completion, the error status. 结束之后的错误状态
Done          chan *Call  // Strobes when call is complete.调用完成时
}

// Client represents an RPC Client. 代表一个RPC客户端
// There may be multiple outstanding Calls associated
// with a single Client, and a Client may be used by
// multiple goroutines simultaneously.
// 可能会有多个未完成的调用关联到一个Client,一个Client可能会被多个goroutine一起使用

type Client struct {
codec ClientCodec

reqMutex sync.Mutex // protects following 互斥锁
request  Request    // server端定义的结构体

mutex    sync.Mutex // protects following 互斥锁
seq      uint64
pending  map[uint64]*Call
closing  bool // user has called Close   用户调用了Close
shutdown bool // server has told us to stop 用户告诉我们停止
}

// A ClientCodec implements writing of RPC requests and
// reading of RPC responses for the client side of an RPC session.
// The client calls WriteRequest to write a request to the connection
// and calls ReadResponseHeader and ReadResponseBody in pairs
// to read responses. The client calls Close when finished with the
// connection. ReadResponseBody may be called with a nil
// argument to force the body of the response to be read and then
// discarded.
// 一个clientcodec实现为一个RPC会话的客户端RPC请求和RPC响应读写。
// 客户端调用WriteRequest写请求,调用ReadResponseHeader和ReadResponseBody配合读响应
// 客户端在一个链接结束后调用Close
// ReadResponseBody也许会被传入一个nil,迫使响应体被读取之后被丢弃。
type ClientCodec interface {
// WriteRequest must be safe for concurrent use by multiple goroutines.
WriteRequest(*Request, interface{}) error
ReadResponseHeader(*Response) error
ReadResponseBody(interface{}) error

Close() error
}

//发送
func (client *Client) send(call *Call) {
client.reqMutex.Lock()     // 加锁
defer client.reqMutex.Unlock() // defer延迟调用保证会解锁

// Register this call. 注册这次请求
client.mutex.Lock()
if client.shutdown || client.closing {   // 若处于停止或关闭状态 返回
call.Error = ErrShutdown
client.mutex.Unlock()
call.done()
return
}
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()

// Encode and send the request.编码并发送请求
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod //要调用的服务名和方法
err := client.codec.WriteRequest(&client.request, call.Args) //写入请求
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}

// 输入
func (client *Client) input() {
var err error
var response Response     // server端定义的结构体
for err == nil {
response = Response{} // 赋空值
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]  // value为struct call的map
delete(client.pending, seq)
client.mutex.Unlock()

switch {
case call == nil:
// We've got no pending call. That usually means that
// WriteRequest partially failed, and call was already
// removed; response is a server telling us about an
// error reading request body. We should still attempt
// to read error body, but there's no one to give it to.
// 在请求的期间我们没有获取到,这通常意味着WriteRequest部分失败,请求已经被移除
// 响应是一个服务告诉我们在读请求体时的一些错误。我们应该尝试的读错误,但是没有能获取的。

err = client.codec.ReadResponseBody(nil) // 与ReadResponseHeader成对使用并传入nil迫使丢弃响应体
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
case response.Error != "":
// We've got an error response. Give this to the request;
// any subsequent requests will get the ReadResponseBody
// error if there is one.
// 我们得到了一个返回错误,把这个返回给请求。如果有错误随后的任何请求都会通过ReadResponse获取
call.Error = ServerError(response.Error)
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
call.done()
default:
err = client.codec.ReadResponseBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
// Terminate pending calls.
// 终止请求
client.reqMutex.Lock()
client.mutex.Lock()
client.shutdown = true
closing := client.closing
if err == io.EOF {
if closing {
err = ErrShutdown
} else {
err = io.ErrUnexpectedEOF
}
}
for _, call := range client.pending {
call.Error = err
call.done()
}
client.mutex.Unlock()
client.reqMutex.Unlock()
if debugLog && err != io.EOF && !closing {
log.Println("rpc: client protocol error:", err)
}
}

func (call *Call) done() {
select {
case call.Done <- call:
// ok
default:
// We don't want to block here. It is the caller's responsibility to make
// sure the channel has enough buffer space. See comment in Go().
// 我们不想在这加锁。这是调用者的责任来确保channel有足够的内存空间。
if debugLog {
log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
}
}
}

// NewClient returns a new Client to handle requests to the
// set of services at the other end of the connection.
// It adds a buffer to the write side of the connection so
// the header and payload are sent as a unit.
// NewClinet返回一个新的客户端句柄,在链接的有效期内链接服务端发送请求。
// 它向连接的写入方添加一个缓冲区,因此请求头和有效载荷作为一个整体发送
func NewClient(conn io.ReadWriteCloser) *Client {
encBuf := bufio.NewWriter(conn)
client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
return NewClientWithCodec(client)
}

// NewClientWithCodec is like NewClient but uses the specified
// codec to encode requests and decode responses.
// NewClientWithCodec与NewClient相似,但是它使用指定得编码器编码请求,解码返回
func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
codec:   codec,
pending: make(map[uint64]*Call),
}
go client.input()
return client
}

type gobClientCodec struct {
rwc    io.ReadWriteCloser
dec    *gob.Decoder
enc    *gob.Encoder
encBuf *bufio.Writer
}

// 写请求
func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) {
if err = c.enc.Encode(r); err != nil {  // 编码
return
}
if err = c.enc.Encode(body); err != nil { // 编码
return
}
return c.encBuf.Flush() // 将c写入io
}

func (c *gobClientCodec) ReadResponseHeader(r *Response) error {
return c.dec.Decode(r)
}

func (c *gobClientCodec) ReadResponseBody(body interface{}) error {
return c.dec.Decode(body)
}

func (c *gobClientCodec) Close() error {
return c.rwc.Close()
}

// DialHTTP connects to an HTTP RPC server at the specified network address
// listening on the default HTTP RPC path.
// dialhttp连接到指定的网络地址http RPC服务器,监听默认的HTTP RPC路径
func DialHTTP(network, address string) (*Client, error) {
return DialHTTPPath(network, address, DefaultRPCPath)
}

// DialHTTPPath connects to an HTTP RPC server
// at the specified network address and path.
// DialHTTPPath连接在指定的网络地址和路径HTTP RPC服务器
func DialHTTPPath(network, address, path string) (*Client, error) {
var err error
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n")

// Require successful HTTP response
// before switching to RPC protocol.
// 在切换到RPC协议之前需要成功的HTTP响应。
resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
if err == nil && resp.Status == connected {
return NewClient(conn), nil
}
if err == nil {
err = errors.New("unexpected HTTP response: " + resp.Status)
}
conn.Close()
return nil, &net.OpError{
Op:   "dial-http",
Net:  network + " " + address,
Addr: nil,
Err:  err,
}
}

// Dial connects to an RPC server at the specified network address.
// Dial 链接到指定网络地址的RPC服务
func Dial(network, address string) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn), nil
}

func (client *Client) Close() error {
client.mutex.Lock()
if client.closing {
client.mutex.Unlock()
return ErrShutdown
}
client.closing = true
client.mutex.Unlock()
return client.codec.Close()
}

// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
// 异步调用函数。它返回call结构体。done的通道将在调用完成时发出相同的调用对象来发出信号。
// 如果done为nil,Go将会分配一个新的channel。如果non-nil,done必须有缓存否则会崩溃
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod //调用的服务的名称和方法
call.Args = args  //参数
call.Reply = reply // 返回
if done == nil {
done = make(chan *Call, 10) // buffered.建立有缓存的channel
} else {
// If caller passes done != nil, it must arrange that
// done has enough buffer for the number of simultaneous
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
// 如果调用者传递的done不为nil,他必须保证done具有足够的缓存,用以保证同时有若干RPCs使用那个channel。
// 如果channel是无缓冲,就不要运行了
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}

// Call invokes the named function, waits for it to complete, and returns its error status.
// 调用命名函数,等待它完成,并返回其错误状态
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: