您的位置:首页 > 理论基础 > 计算机网络

linux 内核tcp数据发送的实现

2012-06-27 12:48 489 查看
在分析之前先来看下SO_RCVTIMEO和SO_SNDTIMEO套接口吧,前面分析代码时没太注意这两个.这里算是个补充.

SO_RCVTIMEO和SO_SNDTIMEO套接口选项可以给套接口的读和写,来设置超时时间,在unix网络编程中,说是他们只能用于读和写,而像accept和connect都不能用他们来设置.可是我在阅读内核源码的过程中看到,在linux中,accept和connect可以分别用SO_RCVTIMEO和SO_SNDTIMEO套接口来设置超时,这里他们的超时时间也就是sock的sk_rcvtimeo和sk_sndtimeo域.accept和connect的相关代码我前面都介绍过了,这里再提一下.其中accept的相关部分在inet_csk_accept中,会调用sock_rcvtimeo来取得超时时间(如果是非阻塞则忽略超时间).而connect的相关代码在inet_stream_connect中通过调用sock_sndtimeo来取得超时时间(如果非阻塞则忽略超时时间).

---------------------------------------------------------------------------------

tcp发送数据最终都会调用到tcp_sendmsg,举个例子吧,比如send系统调用.

send系统调用会z直接调用sys_sendto,然后填充msghdr数据结构,并调用sock_sendmsg,而在他中,则最终会调用__sock_sendmsg.在这个函数里面会初始化sock_iocb结构,然后调用tcp_sendmsg.

在sys_sendto中还会做和前面几个系统调用差不多的操作,就是通过fd得到socket,在sock_sendmsg中则会设置aio所需的操作.

我们简要的看下__sock_sendmsg的实现.可以看到在内核中数据都是用msghdr来表示的(也就是会将char *转为msghdr),而这个结构这里就不介绍了,unix网络编程里面有详细的介绍.而struct kiocb则是aio会用到的.

Java代码


static inline int __sock_sendmsg(struct kiocb *iocb, struct socket *sock,

struct msghdr *msg, size_t size)

{

struct sock_iocb *si = kiocb_to_siocb(iocb);

int err;

si->sock = sock;

si->scm = NULL;

si->msg = msg;

si->size = size;

err = security_socket_sendmsg(sock, msg, size);

if (err)

return err;

///这里就会调用tcp_sendmsg.

return sock->ops->sendmsg(iocb, sock, msg, size);

}

我们在前面知道tcp将数据传递给ip层的时候调用ip_queue_xmit,而在这个函数没有做任何切片的工作,切片的工作都在tcp层完成了.而udp则是需要在ip层进行切片(通过ip_append_data).
而tcp的数据是字节流的,因此在

tcp_sendmsg中主要做的工作就是讲字节流分段(根据mss),然后传递给ip层. 可以看到它的任务和ip_append_data很类似,流程其实也差不多. 所以有兴趣的可以看下我前面的blog

而在tcp_sendmsg中也是要看网卡是否支持Scatter/Gather I/O,从而进行相关操作.

下面我们来看它的实现,我们分段来看:

Java代码


///首先取出句柄的flag,主要是看是非阻塞还是阻塞模式.

flags = msg->msg_flags;

///这里取得发送超时时间.

timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);

///如果connect还没有完成则等待连接完成(如是非阻塞则直接返回).

if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT))

if ((err = sk_stream_wait_connect(sk, &timeo)) != 0)

goto out_err;

/* This should be in poll */

clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);

///取出当前的mss,在tcp_current_mss还会设置xmit_size_goal,这个值一般都是等于mss,除非有gso的情况下,有所不同.这里我们就认为他是和mms相等的.

mss_now = tcp_current_mss(sk, !(flags&MSG_OOB));

size_goal = tp->xmit_size_goal;

在取得了相关的值之后我们进入循环处理msg,我们知道msghdr有可能是包含很多buffer的,因此这里我们分为两层循环,一层是遍历msg的buffer,一层是对buffer进行处理(切包或者组包)并发送给ip层.

首先来看当buf空间不够时的情况,它这里判断buf空间是否足够是通过

Java代码


!tcp_send_head(sk) ||

(copy = size_goal - skb->len) <= 0

来判断的,这里稍微解释下这个:

这里tcp_send_head返回值为sk->sk_send_head,也就是指向当前的将要发送的buf的位置.如果为空,则说明buf没有空间,我们就需要alloc一个段来保存将要发送的msg.

而skb->len指的是当前的skb的所包含的数据的大小(包含头的大小).而这个值如果大于size_goal,则说明buf已满,我们需要重新alloc一个端.如果小于size_goal,则说明buf还有空间来容纳一些数据来组成一个等于mss的数据包再发送给ip层.

Java代码


/* Ok commence sending. */

iovlen = msg->msg_iovlen;

iov = msg->msg_iov;

///copy的大小

copied = 0;

err = -EPIPE;

///如果发送端已经完全关闭则返回,并设置errno.

if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN))

goto do_error;

while (--iovlen >= 0) {

///取得当前buf长度

int seglen = iov->iov_len;

///buf的基地址.

unsigned char __user *from = iov->iov_base;

iov++;

while (seglen > 0) {

int copy;

///我们知道sock的发送队列sk_write_queue是一个双向链表,而用tcp_write_queue_tail则是取得链表的最后一个元素.(如果链表为空则返回NULL).

skb = tcp_write_queue_tail(sk);

///上面介绍过了.主要是判断buf是否有空闲空间.

if (!tcp_send_head(sk) ||

(copy = size_goal - skb->len) <= 0) {

new_segment:

///开始alloc一个新的段.

if (!sk_stream_memory_free(sk))

goto wait_for_sndbuf;

///alloc的大小一般都是等于mss的大小,这里是通过select_size得到的.

skb = sk_stream_alloc_skb(sk, select_size(sk),

sk->sk_allocation);

if (!skb)

goto wait_for_memory;

/*

* Check whether we can use HW checksum.

*/

if (sk->sk_route_caps & NETIF_F_ALL_CSUM)

skb->ip_summed = CHECKSUM_PARTIAL;

///将这个skb加入到sk_write_queue队列中,并更新sk_send_head域.

skb_entail(sk, skb);

///将copy值更新.

copy = size_goal;

}

接下来如果走到这里,则说明 要么已经alloc一个新的buf,要么当前的buf中还有空闲空间.

这里先来分析alloc一个新的buf的情况.

这里先看下skb中的几个域的含义:



head and end 指的是alloc了的buf的起始和终止位置,而data and tail 指的是数据段的起始和终止位置,因此经过每一层tail和data都会变化的,而初始值这两个是相等的.

我们来看skb_tailroom,它主要是用来判断得到当前的skb的tailroom的大小.tailroom也就是当前buf的剩余数据段的大小,这里也就是用来判断当前buf是否能够再添加数据.

Java代码


static inline int skb_is_nonlinear(const struct sk_buff *skb)

{

return skb->data_len;

}

static inline int skb_tailroom(const struct sk_buff *skb)

{

///如果是新alloc的skb则会返回tailroom否则返回0

return skb_is_nonlinear(skb) ? 0 : skb->end - skb->tail;

}

接下来来看代码:

Java代码


while (--iovlen >= 0) {

...........................

while (seglen > 0) {

///如果copy大于buf的大小,则缩小copy.

if (copy > seglen)

copy = seglen;

///这里查看skb的空间.如果大于0,则说明是新建的skb.

if (skb_tailroom(skb) > 0) {

///如果需要复制的数据大于所剩的空间,则先复制当前skb所能容纳的大小.

if (copy > skb_tailroom(skb))

copy = skb_tailroom(skb);

///复制数据到sk_buff.大小为copy.如果成功进入do_fault,(我们下面会分析)

if ((err = skb_add_data(skb, from, copy)) != 0)

goto do_fault;

}

如果走到这一步,当前的sk buff中有空闲空间 也分两种情况,一种是 设备支持Scatter/Gather I/O(原理和udp的ip_append_data一样,可以看我以前的blog).

另外一种情况是设备不支持S/G IO,可是mss变大了.这种情况下我们需要返回new_segment,新建一个段,然后再处理.

\我建议在看这段代码前,可以看下我前面blog分析ip_append_data的那篇.因为那里对S/G IO的设备处理切片的分析比较详细,而这里和那边处理基本类似.这里我对frags的操作什么的都是很简单的描述,详细的在ip_append_data那里已经描述过.

然后再来了解下PSH标记,这个标记主要是用来使接收方将sk->receive_queue上缓存的skb提交给用户进程.详细的介绍可以看tcp协议的相关部分(推功能).在这里设置这个位会有两种情况,第一种是我们写了超过一半窗口大小的数据,此时我们需要标记最后一个段的PSH位.或者我们有一个完整的tcp段发送出去,此时我们也需要标记pSH位.

Java代码


while (--iovlen >= 0) {

...........................

while (seglen > 0) {

...............................

else {

int merge = 0;

///取得nr_frags也就是保存物理页的数组.

int i = skb_shinfo(skb)->nr_frags;

///从socket取得当前的发送物理页.

struct page *page = TCP_PAGE(sk);

///取得当前页的位移.

int off = TCP_OFF(sk);

///这里主要是判断skb的发送页是否已经存在于nr_frags中,如果存在并且也没有满,则我们只需要将数据合并到这个页就可以了,而不需要在frag再添加一个页.

if (skb_can_coalesce(skb, i, page, off) &&

off != PAGE_SIZE) {

merge = 1;

} else if (i == MAX_SKB_FRAGS ||

(!i &&

!(sk->sk_route_caps & NETIF_F_SG))) {

///到这里说明要么设备不支持SG IO,要么页已经满了.因为我们知道nr_frags的大小是有限制的.此时调用tcp_mark_push来加一个PSH标记.

tcp_mark_push(tp, skb);

goto new_segment;

} else if (page) {

if (off == PAGE_SIZE) {

///这里说明当前的发送页已满.

put_page(page);

TCP_PAGE(sk) = page = NULL;

off = 0;

}

} else

off = 0;

if (copy > PAGE_SIZE - off)

copy = PAGE_SIZE - off;

.................................

///如果page为NULL则需要新alloc一个物理页.

if (!page) {

/* Allocate new cache page. */

if (!(page = sk_stream_alloc_page(sk)))

goto wait_for_memory;

}

///开始复制数据到这个物理页.

err = skb_copy_to_page(sk, from, skb, page,

off, copy);

if (err) {

///出错的情况.

if (!TCP_PAGE(sk)) {

TCP_PAGE(sk) = page;

TCP_OFF(sk) = 0;

}

goto do_error;

}

///判断是否为新建的物理页.

if (merge) {

///如果只是在存在的物理页添加数据,则只需要更新size

skb_shinfo(skb)->frags[i - 1].size +=

copy;

} else {

///负责添加此物理页到skb的frags.

skb_fill_page_desc(skb, i, page, off, copy);

if (TCP_PAGE(sk)) {

///设置物理页的引用计数.

get_page(page);

} else if (off + copy < PAGE_SIZE) {

get_page(page);

TCP_PAGE(sk) = page;

}

}

///设置位移.

TCP_OFF(sk) = off + copy;

}

数据复制完毕,接下来就该发送数据了.

这里我们要知道几个tcp_push,tcp_one_push最终都会调用__tcp_push_pending_frames,而在它中间最终会调用tcp_write_xmit,而tcp_write_xmit则会调用tcp_transmit_skb,这个函数最终会调用ip_queue_xmit来讲数据发送给ip层.这里要注意,我们这里的分析忽略掉了,tcp的一些管理以及信息交互的过程.

接下来看数据传输之前先来分析下TCP_PUSH几个函数的实现,tcp_push这几个类似函数的最后一个参数都是一个控制nagle算法的参数,来看下这几个函数的原型:

Java代码


static inline void tcp_push(struct sock *sk, int flags, int mss_now,

int nonagle)

void __tcp_push_pending_frames(struct sock *sk, unsigned int cur_mss,

int nonagle)

static int tcp_write_xmit(struct sock *sk, unsigned int mss_now, int nonagle)

我们还要知道tcp sock有一个nonagle域,这个域是会被tcp_cork套接口选项时被设置为TCP_NAGLE_CORK .先来看tcp_push的实现:

Java代码


static inline void tcp_push(struct sock *sk, int flags, int mss_now,

int nonagle)

{

struct tcp_sock *tp = tcp_sk(sk);

if (tcp_send_head(sk)) {

struct sk_buff *skb = tcp_write_queue_tail(sk);

///MSG_MORE这个参数我们在ip_append_data那里已经介绍过了,就是告诉ip层,我这里主要是一些小的数据包,然后ip层就会提前划分一个mtu大小的buf,然后等待数据的到来.因此如果没有设置这个或者forced_push返回真(我们写了超过最大窗口一般的数据),就标记一个PSH.

if (!(flags & MSG_MORE) || forced_push(tp))

tcp_mark_push(tp, skb);

tcp_mark_urg(tp, flags, skb);

///这里还是根据是否有设置MSG_MORE来判断使用哪个flags.因此可以看到如果我们设置了tcp_cork套接字选项和设置msg的MSG_MORE比较类似.最终调用tcp_push都会传递给__tcp_push_pending_frames的参数为TCP_NAGLE_CORK .

__tcp_push_pending_frames(sk, mss_now,

(flags & MSG_MORE) ? TCP_NAGLE_CORK : nonagle);

}

}

在看tcp_write_xmit之前,我们先来看下tcp_nagle_test,这个函数主要用来检测nagle算法.如果当前允许数据段立即被发送,则返回1,否则为0.

Java代码


///这个函数就不介绍了,内核的注释很详细.

/* Return 0, if packet can be sent now without violation Nagle's rules:

* 1. It is full sized.

* 2. Or it contains FIN. (already checked by caller)

* 3. Or TCP_NODELAY was set.

* 4. Or TCP_CORK is not set, and all sent packets are ACKed.

* With Minshall's modification: all sent small packets are ACKed.

*/

static inline int tcp_nagle_check(const struct tcp_sock *tp,

const struct sk_buff *skb,

unsigned mss_now, int nonagle)

{

return (skb->len < mss_now &&

((nonagle & TCP_NAGLE_CORK) ||

(!nonagle && tp->packets_out && tcp_minshall_check(tp))));

}

static inline int tcp_nagle_test(struct tcp_sock *tp, struct sk_buff *skb,

unsigned int cur_mss, int nonagle)

{

///如果设置了TCP_NAGLE_PUSH则返回1,也就是数据可以立即发送

if (nonagle & TCP_NAGLE_PUSH)

return 1;

/* Don't use the nagle rule for urgent data (or for the final FIN).

* Nagle can be ignored during F-RTO too (see RFC4138).

*/

if (tcp_urg_mode(tp) || (tp->frto_counter == 2) ||

(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_FIN))

return 1;

///再次检测 nonagle域,相关的检测,上面已经说明了.

if (!tcp_nagle_check(tp, skb, cur_mss, nonagle))

return 1;

return 0;

}

然后看下tcp_write_xmit的实现,

Java代码


static int tcp_write_xmit(struct sock *sk, unsigned int mss_now, int nonagle)

{

struct tcp_sock *tp = tcp_sk(sk);

struct sk_buff *skb;

unsigned int tso_segs, sent_pkts;

int cwnd_quota;

int result;

///检测状态.

if (unlikely(sk->sk_state == TCP_CLOSE))

return 0;

sent_pkts = 0;

///探测mtu.

if ((result = tcp_mtu_probe(sk)) == 0) {

return 0;

} else if (result > 0) {

sent_pkts = 1;

}

///开始处理数据包.

while ((skb = tcp_send_head(sk))) {

unsigned int limit;

tso_segs = tcp_init_tso_segs(sk, skb, mss_now);

BUG_ON(!tso_segs);

///主要用来测试congestion window..

cwnd_quota = tcp_cwnd_test(tp, skb);

if (!cwnd_quota)

break;

if (unlikely(!tcp_snd_wnd_test(tp, skb, mss_now)))

break;

if (tso_segs == 1) {

///主要看这里,如果这个skb是写队列的最后一个buf,则传输TCP_NAGLE_PUSH给tcp_nagle_test,这个时侯直接返回1,于是接着往下面走,否则则说明数据包不要求理解发送,我们就跳出循环(这时数据段就不会被发送).比如设置了TCP_CORK.

if (unlikely(!tcp_nagle_test(tp, skb, mss_now,

(tcp_skb_is_last(sk, skb) ?

nonagle : TCP_NAGLE_PUSH))))

break;

} else {

if (tcp_tso_should_defer(sk, skb))

break;

}

limit = mss_now;

if (tso_segs > 1 && !tcp_urg_mode(tp))

limit = tcp_mss_split_point(sk, skb, mss_now,

cwnd_quota);

if (skb->len > limit &&

unlikely(tso_fragment(sk, skb, limit, mss_now)))

break;

TCP_SKB_CB(skb)->when = tcp_time_stamp;

///传输数据给3层.

if (unlikely(tcp_transmit_skb(sk, skb, 1, GFP_ATOMIC)))

break;

/* Advance the send_head. This one is sent out.

* This call will increment packets_out.

*/

tcp_event_new_data_sent(sk, skb);

tcp_minshall_update(tp, mss_now, skb);

sent_pkts++;

}

if (likely(sent_pkts)) {

tcp_cwnd_validate(sk);

return 0;

}

return !tp->packets_out && tcp_send_head(sk);

}

然后返回来,来看刚才紧接着的实现:

Java代码


while (--iovlen >= 0) {

...........................

while (seglen > 0) {

...............................

///如果第一次组完一个段,则设置PSH.

if (!copied)

TCP_SKB_CB(skb)->flags &= ~TCPCB_FLAG_PSH;

///然后设置写队列长度.

tp->write_seq += copy;

TCP_SKB_CB(skb)->end_seq += copy;

skb_shinfo(skb)->gso_segs = 0;

///更新buf基地址以及复制的buf大小.

from += copy;

copied += copy;

///buf已经复制完则退出循环.并发送这个段.

if ((seglen -= copy) == 0 && iovlen == 0)

goto out;

///如果skb的数据大小小于所需拷贝的数据大小或者存在带外数据,我们继续循环,而当存在带外数据时,我们接近着的循环会退出循环,然后调用tcp_push将数据发出.

if (skb->len < size_goal || (flags & MSG_OOB))

continue;

///forced_push用来判断我们是否已经写了多于一半窗口大小的数据到对端.如果是,我们则要发送一个推数据(PSH).

if (forced_push(tp)) {

tcp_mark_push(tp, skb);

///调用__tcp_push_pending_frames将开启NAGLE算法的缓存的段全部发送出去.

__tcp_push_pending_frames(sk, mss_now, TCP_NAGLE_PUSH);

} else if (skb == tcp_send_head(sk))

///如果当前将要发送的buf刚好为skb,则会传发送当前的buf

tcp_push_one(sk, mss_now);

continue;

wait_for_sndbuf:

set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);

wait_for_memory:

if (copied)

///内存不够,则尽量将本地的NAGLE算法所缓存的数据发送出去.

tcp_push(sk, flags & ~MSG_MORE, mss_now, TCP_NAGLE_PUSH);

if ((err = sk_stream_wait_memory(sk, &timeo)) != 0)

goto do_error;

///更新相关域.

mss_now = tcp_current_mss(sk, !(flags&MSG_OOB));

size_goal = tp->xmit_size_goal;

}

}

最后来看下出错或者成功tcp_sendmsg所做的:

Java代码


out:

///这里是成功返回所做的.

if (copied)

///这里可以看到最终的flag是tp->nonagle,而这个就是看套接口选项是否有开nagle算法,如果没开的话,立即把数据发出去,否则则会村讯nagle算法,将小数据缓存起来.

tcp_push(sk, flags, mss_now, tp->nonagle);

TCP_CHECK_TIMER(sk);

release_sock(sk);

return copied;

do_fault:

if (!skb->len) {

///从write队列unlink掉当前的buf.

tcp_unlink_write_queue(skb, sk);

///更新send)head

tcp_check_send_head(sk, skb);

///释放skb.

sk_wmem_free_skb(sk, skb);

}

do_error:

if (copied)

///如果copied不为0,则说明发送成功一部分数据,因此此时返回out.

goto out;

out_err:

///否则进入错误处理.

err = sk_stream_error(sk, flags, err);

TCP_CHECK_TIMER(sk);

release_sock(sk);

return err;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: