Go net/PRC源码阅读client.go
2017-07-02 16:14
645 查看
Client端
本篇文章主要是在go net/rpc 的client.go包进行翻译,并添加注释之后会对client以及server进行总结,废话不多说 直接贴代码了。有不正确的地方还请多多指正。package rpc import ( "bufio" "encoding/gob" "errors" "io" "log" "net" "net/http" "sync" ) // ServerError represents an error that has been returned from // the remote side of the RPC connection. // 返回一个远程连接的错误 type ServerError string func (e ServerError) Error() string { return string(e) } var ErrShutdown = errors.New("connection is shut down") // Call represents an active RPC. type Call struct { ServiceMethod string // The name of the service and method to call.要调用的服务的名称和方法 Args interface{} // The argument to the function (*struct).函数的参数 Reply interface{} // The reply from the function (*struct). 函数的返回 Error error // After completion, the error status. 结束之后的错误状态 Done chan *Call // Strobes when call is complete.调用完成时 } // Client represents an RPC Client. 代表一个RPC客户端 // There may be multiple outstanding Calls associated // with a single Client, and a Client may be used by // multiple goroutines simultaneously. // 可能会有多个未完成的调用关联到一个Client,一个Client可能会被多个goroutine一起使用 type Client struct { codec ClientCodec reqMutex sync.Mutex // protects following 互斥锁 request Request // server端定义的结构体 mutex sync.Mutex // protects following 互斥锁 seq uint64 pending map[uint64]*Call closing bool // user has called Close 用户调用了Close shutdown bool // server has told us to stop 用户告诉我们停止 } // A ClientCodec implements writing of RPC requests and // reading of RPC responses for the client side of an RPC session. // The client calls WriteRequest to write a request to the connection // and calls ReadResponseHeader and ReadResponseBody in pairs // to read responses. The client calls Close when finished with the // connection. ReadResponseBody may be called with a nil // argument to force the body of the response to be read and then // discarded. // 一个clientcodec实现为一个RPC会话的客户端RPC请求和RPC响应读写。 // 客户端调用WriteRequest写请求,调用ReadResponseHeader和ReadResponseBody配合读响应 // 客户端在一个链接结束后调用Close // ReadResponseBody也许会被传入一个nil,迫使响应体被读取之后被丢弃。 type ClientCodec interface { // WriteRequest must be safe for concurrent use by multiple goroutines. WriteRequest(*Request, interface{}) error ReadResponseHeader(*Response) error ReadResponseBody(interface{}) error Close() error } //发送 func (client *Client) send(call *Call) { client.reqMutex.Lock() // 加锁 defer client.reqMutex.Unlock() // defer延迟调用保证会解锁 // Register this call. 注册这次请求 client.mutex.Lock() if client.shutdown || client.closing { // 若处于停止或关闭状态 返回 call.Error = ErrShutdown client.mutex.Unlock() call.done() return } seq := client.seq client.seq++ client.pending[seq] = call client.mutex.Unlock() // Encode and send the request.编码并发送请求 client.request.Seq = seq client.request.ServiceMethod = call.ServiceMethod //要调用的服务名和方法 err := client.codec.WriteRequest(&client.request, call.Args) //写入请求 if err != nil { client.mutex.Lock() call = client.pending[seq] delete(client.pending, seq) client.mutex.Unlock() if call != nil { call.Error = err call.done() } } } // 输入 func (client *Client) input() { var err error var response Response // server端定义的结构体 for err == nil { response = Response{} // 赋空值 err = client.codec.ReadResponseHeader(&response) if err != nil { break } seq := response.Seq client.mutex.Lock() call := client.pending[seq] // value为struct call的map delete(client.pending, seq) client.mutex.Unlock() switch { case call == nil: // We've got no pending call. That usually means that // WriteRequest partially failed, and call was already // removed; response is a server telling us about an // error reading request body. We should still attempt // to read error body, but there's no one to give it to. // 在请求的期间我们没有获取到,这通常意味着WriteRequest部分失败,请求已经被移除 // 响应是一个服务告诉我们在读请求体时的一些错误。我们应该尝试的读错误,但是没有能获取的。 err = client.codec.ReadResponseBody(nil) // 与ReadResponseHeader成对使用并传入nil迫使丢弃响应体 if err != nil { err = errors.New("reading error body: " + err.Error()) } case response.Error != "": // We've got an error response. Give this to the request; // any subsequent requests will get the ReadResponseBody // error if there is one. // 我们得到了一个返回错误,把这个返回给请求。如果有错误随后的任何请求都会通过ReadResponse获取 call.Error = ServerError(response.Error) err = client.codec.ReadResponseBody(nil) if err != nil { err = errors.New("reading error body: " + err.Error()) } call.done() default: err = client.codec.ReadResponseBody(call.Reply) if err != nil { call.Error = errors.New("reading body " + err.Error()) } call.done() } } // Terminate pending calls. // 终止请求 client.reqMutex.Lock() client.mutex.Lock() client.shutdown = true closing := client.closing if err == io.EOF { if closing { err = ErrShutdown } else { err = io.ErrUnexpectedEOF } } for _, call := range client.pending { call.Error = err call.done() } client.mutex.Unlock() client.reqMutex.Unlock() if debugLog && err != io.EOF && !closing { log.Println("rpc: client protocol error:", err) } } func (call *Call) done() { select { case call.Done <- call: // ok default: // We don't want to block here. It is the caller's responsibility to make // sure the channel has enough buffer space. See comment in Go(). // 我们不想在这加锁。这是调用者的责任来确保channel有足够的内存空间。 if debugLog { log.Println("rpc: discarding Call reply due to insufficient Done chan capacity") } } } // NewClient returns a new Client to handle requests to the // set of services at the other end of the connection. // It adds a buffer to the write side of the connection so // the header and payload are sent as a unit. // NewClinet返回一个新的客户端句柄,在链接的有效期内链接服务端发送请求。 // 它向连接的写入方添加一个缓冲区,因此请求头和有效载荷作为一个整体发送 func NewClient(conn io.ReadWriteCloser) *Client { encBuf := bufio.NewWriter(conn) client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf} return NewClientWithCodec(client) } // NewClientWithCodec is like NewClient but uses the specified // codec to encode requests and decode responses. // NewClientWithCodec与NewClient相似,但是它使用指定得编码器编码请求,解码返回 func NewClientWithCodec(codec ClientCodec) *Client { client := &Client{ codec: codec, pending: make(map[uint64]*Call), } go client.input() return client } type gobClientCodec struct { rwc io.ReadWriteCloser dec *gob.Decoder enc *gob.Encoder encBuf *bufio.Writer } // 写请求 func (c *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) { if err = c.enc.Encode(r); err != nil { // 编码 return } if err = c.enc.Encode(body); err != nil { // 编码 return } return c.encBuf.Flush() // 将c写入io } func (c *gobClientCodec) ReadResponseHeader(r *Response) error { return c.dec.Decode(r) } func (c *gobClientCodec) ReadResponseBody(body interface{}) error { return c.dec.Decode(body) } func (c *gobClientCodec) Close() error { return c.rwc.Close() } // DialHTTP connects to an HTTP RPC server at the specified network address // listening on the default HTTP RPC path. // dialhttp连接到指定的网络地址http RPC服务器,监听默认的HTTP RPC路径 func DialHTTP(network, address string) (*Client, error) { return DialHTTPPath(network, address, DefaultRPCPath) } // DialHTTPPath connects to an HTTP RPC server // at the specified network address and path. // DialHTTPPath连接在指定的网络地址和路径HTTP RPC服务器 func DialHTTPPath(network, address, path string) (*Client, error) { var err error conn, err := net.Dial(network, address) if err != nil { return nil, err } io.WriteString(conn, "CONNECT "+path+" HTTP/1.0\n\n") // Require successful HTTP response // before switching to RPC protocol. // 在切换到RPC协议之前需要成功的HTTP响应。 resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"}) if err == nil && resp.Status == connected { return NewClient(conn), nil } if err == nil { err = errors.New("unexpected HTTP response: " + resp.Status) } conn.Close() return nil, &net.OpError{ Op: "dial-http", Net: network + " " + address, Addr: nil, Err: err, } } // Dial connects to an RPC server at the specified network address. // Dial 链接到指定网络地址的RPC服务 func Dial(network, address string) (*Client, error) { conn, err := net.Dial(network, address) if err != nil { return nil, err } return NewClient(conn), nil } func (client *Client) Close() error { client.mutex.Lock() if client.closing { client.mutex.Unlock() return ErrShutdown } client.closing = true client.mutex.Unlock() return client.codec.Close() } // Go invokes the function asynchronously. It returns the Call structure representing // the invocation. The done channel will signal when the call is complete by returning // the same Call object. If done is nil, Go will allocate a new channel. // If non-nil, done must be buffered or Go will deliberately crash. // 异步调用函数。它返回call结构体。done的通道将在调用完成时发出相同的调用对象来发出信号。 // 如果done为nil,Go将会分配一个新的channel。如果non-nil,done必须有缓存否则会崩溃 func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call { call := new(Call) call.ServiceMethod = serviceMethod //调用的服务的名称和方法 call.Args = args //参数 call.Reply = reply // 返回 if done == nil { done = make(chan *Call, 10) // buffered.建立有缓存的channel } else { // If caller passes done != nil, it must arrange that // done has enough buffer for the number of simultaneous // RPCs that will be using that channel. If the channel // is totally unbuffered, it's best not to run at all. // 如果调用者传递的done不为nil,他必须保证done具有足够的缓存,用以保证同时有若干RPCs使用那个channel。 // 如果channel是无缓冲,就不要运行了 if cap(done) == 0 { log.Panic("rpc: done channel is unbuffered") } } call.Done = done client.send(call) return call } // Call invokes the named function, waits for it to complete, and returns its error status. // 调用命名函数,等待它完成,并返回其错误状态 func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error { call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done return call.Error }
相关文章推荐
- go(golang) dns 解析源码 go/src/net/dnsclient_unix.go 分析
- Go net/http 主要功能及部分源码阅读
- Memcache-Java-Client-Release源码阅读(之四)
- Single Shot MultiBox Detector(MXNet)源码阅读笔记(2)
- ASP.NET Core 源码阅读笔记(5) ---Microsoft.AspNetCore.Routing路由
- Go语言Http Server源码阅读
- 28 GroupSock(NetAddress)——live555源码阅读(四)网络
- HttpClient4.3.6源码阅读 RequestConfig.Builder(Builder模式实践)
- Asp.net MVC源码分析--Model Validation(Client端)实现(1)
- nsq源码阅读 nsqd源码四 nsqd/lookup.go 与nsqlookupd服务的交互
- go HTTP Client大量长连接保持(自定义client设置及源码简单分析)
- [hadoop源码阅读][6]-org.apache.hadoop.ipc-ipc.client
- Memcache-Java-Client-Release源码阅读(之二)
- Asp.net MVC源码分析--Model Validation(Client端)实现(2)
- Eclipse源码阅读:GotoTypeAction
- Memcache-Java-Client-Release源码阅读(之一)
- Go 1.9 sync Map 源码阅读笔记
- kubernetes源码之watch包filter.go阅读理解三
- nsq源码阅读 nsqlookupd源码一 nsqlookupd.go
- 29 GroupSock(NetAddressList)——live555源码阅读(四)网络