Golang网络库中socket阻塞调度源码剖析
2016-06-28 20:45
399 查看
本文分析了Golang的socket文件描述符和goroutine阻塞调度的原理。代码中大部分是Go代码,小部分是汇编代码。完整理解本文需要Go语言知识,并且用Golang写过网络程序。更重要的是,需要提前理解goroutine的调度原理。
这个接口就是对下面的结构体
可以看到,对连接的所有操作,都体现在对
网络轮询器的数据结构如下:
res是runtime_pollWait函数返回的结果,由conevertErr函数包装后返回:
函数返回0,表示IO已经准备好,返回nil。
返回1,说明连接已关闭,应该放回errClosing。
返回2,说明对IO进行的操作发生超时,应该返回errTimeout。
runtime_pollWait会调用runtime/thunk.s中的函数:
这是一个包装函数,没有参数,直接跳转到runtime/netpoll.go中的函数netpollWait:
runtime/proc.go:
mcall函数是一段汇编,在m->g0的栈上调用park_m,而不是在当前goroutine的栈上。mcall的功能分两部分,第一部分保存当前G的PC/SP到G的gobuf的pc/sp字段,第二部分调用park_m函数:
park_m函数的功能分为三部分,第一部分让当前G和当前M脱离关系,第二部分是调用解锁函数,这里是调用netpoll.go源文件中的netpollblockcommit函数:
netpollblockcommit函数,设置gpp为pdWait,设置成功返回1,否则返回0。1为true,0为false:
到这里当前goroutine对socket文件描述符的等待IO继续的行为已经完成。过程中首先尽早尝试判断IO是否已经就绪,如果未就绪则挂起当前goroutine,挂起之后再次判断IO是否就绪,如果还未就绪则调度当前
接下来是等待runtime将其唤醒。runtime在执行
在netpoll_epoll.go中的
所以runtime在执行
http://ju.outofmemory.cn/entry/168649
1. TCP的连接对象:
连接对象:
在net.go中有一个名为Conn的接口,提供了对于连接的读写和其他操作:
type Conn interface { Read(b []byte) (n int, err error) Write(b []byte) (n int, err error) Close() error LocalAddr() Addr RemoteAddr() Addr SetReadDeadline(t time.Time) error SetWriteDeadline(t time.Time) error }
这个接口就是对下面的结构体
conn的抽象。
conn结构体包含了对连接的读写和其他操作:
type conn struct { fd *netFD }
从连接读取数据:
// Read implements the Conn Read method. func (c *conn) Read(b []byte) (int, error) { if !c.ok() { return 0, syscall.EINVAL } return c.fd.Read(b) }
向连接写入数据:
// Write implements the Conn Write method. func (c *conn) Write(b []byte) (int, error) { if !c.ok() { return 0, syscall.EINVAL } return c.fd.Write(b) }
关闭连接:
// Close closes the connection. func (c *conn) Close() error { if !c.ok() { return syscall.EINVAL } return c.fd.Close() }
设置读写超时:
// SetDeadline implements the Conn SetDeadline method. func (c *conn) SetDeadline(t time.Time) error { if !c.ok() { return syscall.EINVAL } return c.fd.setDeadline(t) } // SetReadDeadline implements the Conn SetReadDeadline method. func (c *conn) SetReadDeadline(t time.Time) error { if !c.ok() { return syscall.EINVAL } return c.fd.setReadDeadline(t) } // SetWriteDeadline implements the Conn SetWriteDeadline method. func (c *conn) SetWriteDeadline(t time.Time) error { if !c.ok() { return syscall.EINVAL } return c.fd.setWriteDeadline(t) }
可以看到,对连接的所有操作,都体现在对*netFD
的操作上。我们继续跟踪c.fd.Read()
函数.
2.文件描述符
net/fd_unix.go:
网络连接的文件描述符:
// Network file descriptor. type netFD struct { // locking/lifetime of sysfd + serialize access to Read and Write methods fdmu fdMutex // immutable until Close sysfd int family int sotype int isConnected bool net string laddr Addr raddr Addr // wait server pd pollDesc }
文件描述符读取数据:
func (fd *netFD) Read(p []byte) (n int, err error) { if err := fd.readLock(); err != nil { return 0, err } defer fd.readUnlock() if err := fd.pd.PrepareRead(); err != nil { return 0, &OpError{"read", fd.net, fd.raddr, err} } // 调用system call,循环从fd.sysfd读取数据 for { // 系统调用Read读取数据 n, err = syscall.Read(int(fd.sysfd), p) // 如果发生错误,则需要处理 // 并且只处理EAGAIN类型的错误,其他错误一律返回给调用者 if err != nil { n = 0 // 对于非阻塞的网络连接的文件描述符,如果错误是EAGAIN // 说明Socket的缓冲区为空,未读取到任何数据 // 则调用fd.pd.WaitRead, if err == syscall.EAGAIN { if err = fd.pd.WaitRead(); err == nil { continue } } } err = chkReadErr(n, err, fd) break } if err != nil && err != io.EOF { err = &OpError{"read", fd.net, fd.raddr, err} } return }
网络轮询器
网络轮询器是Golang中针对每个socket文件描述符建立的轮询机制。 此处的轮询并不是一般意义上的轮询,而是Golang的runtime在调度goroutine或者GC完成之后或者指定时间之内,调用epoll_wait获取所有产生IO事件的socket文件描述符。当然在runtime轮询之前,需要将socket文件描述符和当前goroutine的相关信息加入epoll维护的数据结构中,并挂起当前goroutine,当IO就绪后,通过epoll返回的文件描述符和其中附带的goroutine的信息,重新恢复当前goroutine的执行。// Integrated network poller (platform-independent part). // 网络轮询器(平台独立部分) // A particular implementation (epoll/kqueue) must define the following functions: // 实际的实现(epoll/kqueue)必须定义以下函数: // func netpollinit() // to initialize the poller,初始化轮询器 // func netpollopen(fd uintptr, pd *pollDesc) int32 // to arm edge-triggered notifications, 为fd和pd启动边缘触发通知 // and associate fd with pd. // 一个实现必须调用下面的函数,用来指示pd已经准备好 // An implementation must call the following function to denote that the pd is ready. // func netpollready(gpp **g, pd *pollDesc, mode int32) // pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer // goroutines respectively. The semaphore can be in the following states: // pollDesc包含了2个二进制的信号,分别负责读写goroutine的暂停. // 信号可能处于下面的状态: // pdReady - IO就绪通知被挂起; // 一个goroutine将次状态置为nil来消费一个通知。 // pdReady - io readiness notification is pending; // a goroutine consumes the notification by changing the state to nil. // pdWait - 一个goroutine准备暂停在信号上,但是还没有完成暂停。 // 这个goroutine通过把这个状态改变为G指针去提交这个暂停动作。 // 或者,替代性的,并行的其他通知将状态改变为READY. // 或者,替代性的,并行的超时/关闭会将次状态变为nil // pdWait - a goroutine prepares to park on the semaphore, but not yet parked; // the goroutine commits to park by changing the state to G pointer, // or, alternatively, concurrent io notification changes the state to READY, // or, alternatively, concurrent timeout/close changes the state to nil. // G指针 - 阻塞在信号上的goroutine // IO通知或者超时/关闭会分别将此状态置为READY或者nil. // G pointer - the goroutine is blocked on the semaphore; // io notification or timeout/close changes the state to READY or nil respectively // and unparks the goroutine. // nil - nothing of the above. const ( pdReady uintptr = 1 pdWait uintptr = 2 )
网络轮询器的数据结构如下:
// Network poller descriptor. // 网络轮询器描述符 type pollDesc struct { link *pollDesc // in pollcache, protected by pollcache.lock // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations. // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime. // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification) // proceed w/o taking the lock. So closing, rg, rd, wg and wd are manipulated // in a lock-free way by all operations. // NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg), // that will blow up when GC starts moving objects. // // lock锁对象保护了pollOpen, pollSetDeadline, pollUnblock和deadlineimpl操作。 // 而这些操作又完全包含了对seq, rt, tw变量。 // fd在PollDesc整个生命过程中都是一个常量。 // 处理pollReset, pollWait, pollWaitCanceled和runtime.netpollready(IO就绪通知)不需要用到锁。 // 所以closing, rg, rd, wg和wd的所有操作都是一个无锁的操作。 lock mutex // protectes the following fields fd uintptr closing bool seq uintptr // protects from stale timers and ready notifications rg uintptr // pdReady, pdWait, G waiting for read or nil rt timer // read deadline timer (set if rt.f != nil) rd int64 // read deadline wg uintptr // pdReady, pdWait, G waiting for write or nil wt timer // write deadline timer wd int64 // write deadline user unsafe.Pointer // user settable cookie }
将当前goroutine设置为阻塞在fd上:
pd.WaitRead():
func (pd *pollDesc) WaitRead() error { return pd.Wait('r') } func (pd *pollDesc) Wait(mode int) error { res := runtime_pollWait(pd.runtimeCtx, mode) return convertErr(res) }
res是runtime_pollWait函数返回的结果,由conevertErr函数包装后返回:
func convertErr(res int) error { switch res { case 0: return nil case 1: return errClosing case 2: return errTimeout } println("unreachable: ", res) panic("unreachable") }
函数返回0,表示IO已经准备好,返回nil。
返回1,说明连接已关闭,应该放回errClosing。
返回2,说明对IO进行的操作发生超时,应该返回errTimeout。
runtime_pollWait会调用runtime/thunk.s中的函数:
TEXT net·runtime_pollWait(SB),NOSPLIT,$0-0 JMP runtime·netpollWait(SB)
这是一个包装函数,没有参数,直接跳转到runtime/netpoll.go中的函数netpollWait:
func netpollWait(pd *pollDesc, mode int) int { // 检查pd的状态是否异常 err := netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // As for now only Solaris uses level-triggered IO. if GOOS == "solaris" { onM(func() { netpollarm(pd, mode) }) } // 循环中检查pd的状态是不是已经被设置为pdReady // 即检查IO是不是已经就绪 for !netpollblock(pd, int32(mode), false) { err = netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // Can happen if timeout has fired and unblocked us, // but before we had a chance to run, timeout has been reset. // Pretend it has not happened and retry. } return 0 }
netpollcheckerr函数检查pd是否出现异常:
// 检查pd的异常 func netpollcheckerr(pd *pollDesc, mode int32) int { // 是否已经关闭 if pd.closing { return 1 // errClosing } // 当读写状态下,deadline小于0,表示pd已经过了超时时间 if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) { return 2 // errTimeout } // 正常情况返回0 return 0 }
netpollblock():
// returns true if IO is ready, or false if timedout or closed // waitio - wait only for completed IO, ignore errors // 这个函数被netpollWait循环调用 // 返回true说明IO已经准备好,返回false说明IO操作已经超时或者已经关闭 func netpollblock(pd *pollDesc, mode int32, waitio bool) bool { // 获取pd的rg gpp := &pd.rg // 如果模式是w,则获取pd的wg if mode == 'w' { gpp = &pd.wg } // set the gpp semaphore to WAIT // 在循环中设置pd的gpp为pdWait // 因为casuintptr是自旋锁,所以需要在循环中调用 for { // 如果在循环中发现IO已经准备好(pg的rg或者wg为pdReady状态) // 则设置rg/wg为0,返回true old := *gpp if old == pdReady { *gpp = 0 return true } // 每次netpollblock执行完毕之后,gpp重置为0 // 非0表示重复wait if old != 0 { gothrow("netpollblock: double wait") } // CAS操作改变gpp为pdWait if casuintptr(gpp, 0, pdWait) { break } } // need to recheck error states after setting gpp to WAIT // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg // // 当设置gpp为pdWait状态后,重新检查gpp的状态 // 这是必要的,因为runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl会做相反的操作 // 如果状态正常则挂起当前的goroutine // // 当netpollcheckerr检查io出现超时或者错误,waitio为true可用于等待ioReady // 否则当waitio为false, 且io不出现错误或者超时才会挂起当前goroutine if waitio || netpollcheckerr(pd, mode) == 0 { // 解锁函数,设置gpp为pdWait,如果设置不成功 // 说明已经是发生其他事件,可以让g继续运行,而不是挂起当前g f := netpollblockcommit // 尝试挂起当前g gopark(**(**unsafe.Pointer)(unsafe.Pointer(&f)), unsafe.Pointer(gpp), "IO wait") } // be careful to not lose concurrent READY notification old := xchguintptr(gpp, 0) if old > pdWait { gothrow("netpollblock: corrupted state") } return old == pdReady }
runtime/proc.go:
gopark():
// Puts the current goroutine into a waiting state and calls unlockf. // If unlockf returns false, the goroutine is resumed. // 将当前goroutine置为waiting状态,然后调用unlockf func gopark(unlockf unsafe.Pointer, lock unsafe.Pointer, reason string) { // 获取当前M mp := acquirem() // 获取当前G gp := mp.curg // 获取G的状态 status := readgstatus(gp) // 如果不是_Grunning或者_Gscanrunning,则报错 if status != _Grunning && status != _Gscanrunning { gothrow("gopark: bad g status") } // 设置lock和unlockf mp.waitlock = lock mp.waitunlockf = unlockf gp.waitreason = reason releasem(mp) // can't do anything that might move the G between Ms here. // 在m->g0这个栈上调用park_m,而不是当前g的栈 mcall(park_m) }
mcall函数是一段汇编,在m->g0的栈上调用park_m,而不是在当前goroutine的栈上。mcall的功能分两部分,第一部分保存当前G的PC/SP到G的gobuf的pc/sp字段,第二部分调用park_m函数:
// func mcall(fn func(*g)) // Switch to m->g0's stack, call fn(g). // Fn must never return. It should gogo(&g->sched) // to keep running g. TEXT runtime·mcall(SB), NOSPLIT, $0-8 // 将需要执行的函数保存在DI MOVQ fn+0(FP), DI // 将M的TLS存放在CX get_tls(CX) // 将G对象存放在AX MOVQ g(CX), AX // save state in g->sched // 将调用者的PC存放在BX MOVQ 0(SP), BX // caller's PC // 将调用者的PC保存到g->sched.pc MOVQ BX, (g_sched+gobuf_pc)(AX) // 第一个参数的地址,即栈顶的地址,保存到BX LEAQ fn+0(FP), BX // caller's SP // 保存SP的地址到g->sched.sp MOVQ BX, (g_sched+gobuf_sp)(AX) // 将g对象保存到g->sched->g MOVQ AX, (g_sched+gobuf_g)(AX) // switch to m->g0 & its stack, call fn // 将g对象指针保存到BX MOVQ g(CX), BX // 将g->m保存到BX MOVQ g_m(BX), BX // 将m->g0保存到SI MOVQ m_g0(BX), SI CMPQ SI, AX // if g == m->g0 call badmcall JNE 3(PC) MOVQ $runtime·badmcall(SB), AX JMP AX // 将m->g0保存到g MOVQ SI, g(CX) // g = m->g0 // 将g->sched.sp恢复到SP寄存器 // 即使用g0的栈 MOVQ (g_sched+gobuf_sp)(SI), SP // sp = m->g0->sched.sp // AX进栈 PUSHQ AX MOVQ DI, DX // 将fn的地址复制到DI MOVQ 0(DI), DI // 调用函数 CALL DI // AX出栈 POPQ AX MOVQ $runtime·badmcall2(SB), AX JMP AX RET
park_m函数的功能分为三部分,第一部分让当前G和当前M脱离关系,第二部分是调用解锁函数,这里是调用netpoll.go源文件中的netpollblockcommit函数:
// runtime·park continuation on g0. void runtime·park_m(G *gp) { bool ok; // 设置当前g为Gwaiting状态 runtime·casgstatus(gp, Grunning, Gwaiting); // 让当前g和m脱离关系 dropg(); if(g->m->waitunlockf) { ok = g->m->waitunlockf(gp, g->m->waitlock); g->m->waitunlockf = nil; g->m->waitlock = nil; // 返回0为false,非0为true // 0说明g->m->waitlock发生了变化,即不是在gopark是设置的(pdWait) // 说明了脱离了WAIT状态,应该设置为Grunnable,并执行g if(!ok) { runtime·casgstatus(gp, Gwaiting, Grunnable); execute(gp); // Schedule it back, never returns. } } // 这里是调度当前m继续执行其他g // 而不是上面执行execute schedule(); }
netpollblockcommit函数,设置gpp为pdWait,设置成功返回1,否则返回0。1为true,0为false:
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool { return casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp))) }
到这里当前goroutine对socket文件描述符的等待IO继续的行为已经完成。过程中首先尽早尝试判断IO是否已经就绪,如果未就绪则挂起当前goroutine,挂起之后再次判断IO是否就绪,如果还未就绪则调度当前
M运行其他
G。如果是在调度goroutine之前IO已经就绪,则不会使当前goroutine进入调度队列,会直接运行刚才挂起的G。否则当前goroutine会进入调度队列。
接下来是等待runtime将其唤醒。runtime在执行
findrunnablequeue、
starttheworld,
sysmon函数时,都会调用netpoll_epoll.go中的
netpoll函数,寻找到IO就绪的socket文件描述符,并找到这些socket文件描述符对应的轮询器中附带的信息,根据这些信息将之前等待这些socket文件描述符就绪的goroutine状态修改为Grunnable。在以上函数中,执行完netpoll之后,会找到一个就绪的goroutine列表,接下来将就绪的goroutine加入到调度队列中,等待调度运行。
在netpoll_epoll.go中的
netpoll函数中,
epoll_wait函数返回N个发生事件的文件描述符对应的epollevent,接着对于每个event使用其data属性,将
event.data转换为
*pollDesc类型,再调用netpoll.go中的netpollready函数,将
*pollDesc类型中的
G数据类型去除,并附加到
netpoll函数的调用者传递的G链表中:
// 将ev.data转换为*pollDesc类型 pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) // 调用netpollready将取出pd中保存的G,并添加到链表中 netpollready((**g)(noescape(unsafe.Pointer(&gp))), pd, mode)
所以runtime在执行
findrunnablequeue、
starttheworld,
sysmon函数中会执行
netpoll函数,并返回N个goroutine。这些goroutine期待的网络事件已经发生,runtime会将这些goroutine放入到当前
P的可运行队列中,接下来调度它们并运行。
http://ju.outofmemory.cn/entry/168649
相关文章推荐
- HTTP协议
- 矢量网络分析仪--测天线时使用技巧
- H264参数结构二:网络提取层NAL (Net Abstraction Layer) & 视频编码层VCL (Video Coding Layer)
- cocos2dx 3.x lua http请求网络图片,自己封装的,很好用,需要的可以看看
- 网络API下获取天气json数据信息
- delphi 7中使用idhttp抓取网页 解决假死现象
- hdu3572(网络流)
- (http://fonts.googleapis.com/css?)打开很慢解决方案
- 网络设备入门测试
- HTTPS_SSLTT介绍及其配置
- TCP/IP、Http/Soap协议-基本认识
- ISO/OSI网络体系结构和TCP/IP协议模型
- 【chrome开发者工具介绍】评估网络性能
- TCP/IP 网络协议基础
- The method getContextPath() from the type HttpServletRequest
- http://www.cnblogs.com/ITtangtang/archive/2012/05/21/2511749.html
- http与http2.0
- SPP-Net 与 RCNN 网络的区别
- ajax原理和XmlHttpRequest对象
- rtp/rtcp header