您的位置:首页 > 理论基础 > 计算机网络

linux网络编程之posix 线程(一):线程模型、pthread 系列函数 和 简单多线程服务器端程序

2013-06-16 19:08 656 查看
一、线程有3种模型,分别是N:1用户线程模型,1:1核心线程模型和N:M混合线程模型,posix thread属于1:1模型。

(一)、N:1用户线程模型

“线程实现”建立在“进程控制”机制之上,由用户空间的程序库来管理。OS内核完全不知道线程信息。这些线程称为用户空间线程。这些线程都工作在“进

程竞争范围”(process contention scope):各个线程在同一进程竞争“被调度的CPU时间”(但不直接和其他进程中的线程竞争)。



在N:1线程模型中,内核不干涉线程的任何生命活动,也不干涉同一进程中的线程环境切换。

在N:1线程模型中,一个进程中的多个线程只能调度到一个CPU,这种约束限制了可用的并行总量。

第二个缺点是如果某个线程执行了一个“阻塞式”操作(如read),那么,进程中的所有线程都会阻塞,直至那个操作结束。为此,一些线程的实现是为

这些阻塞式函数提供包装器,用非阻塞版本替换这些系统调用,以消除这种限制。

(二)、1:1核心线程模型 pthread线程库--NPTL(Native
POSIX Threading Library)

在1:1核心线程模型中,应用程序创建的每一个线程(也有书称为LWP)都由一个核心线程直接管理。OS内核将每一个核心线程都调到系统CPU上,

因此,所有线程都工作在“系统竞争范围”(system contention scope):线程直接和“系统范围”内的其他线程竞争。

这种线程的创建与调度由内核完成,因为这种线程的系统开销比较大(但一般来说,比进程开销小)



(三)、N:M混合线程模型  NGPT(Next Generation POSIX Threads)

N:M混合线程模型提供了两级控制,将用户线程映射为系统的可调度体以实现并行,这个可调度体称为轻量级进程(LWP:light weight process),LWP
再一一映射到核心线程。如下图所示。OS内核将每一个核心线程都调到系统CPU上,因此,所有线程都工作在“系统竞争范围”。



据说一些类UNIX系统(如Solaris)已经实现了比较成熟的M:N线程模型, 其性能比起linux的线程还是有着一定的优势,但不能利用SMP结构。

按照2003年3月NGPT官方网站上的通知,NGPT考虑到NPTL日益广泛地为人所接受,为避免不同的线程库版本引起的混乱,今后将不再进行进一步开发,而今进行支持性的维护工作。也就是说,NGPT已经放弃与NPTL竞争下一代Linux POSIX线程库标准。

二、posix 线程概述

我们知道,进程在各自独立的地址空间中运行,进程之间共享数据需要用进程间通信机制,有些情况需要在一个进程中同时执行多个控制流程,这时候

线程就派上了用场,比如实现一个图形界面的下载软件,一方面需要和用户交互,等待和处理用户的鼠标键盘事件,另一方面又需要同时下载多个文

件,等待和处理从多个网络主机发来的数据,这些任务都需要一个“等待-处理”的循环,可以用多线程实现,一个线程专门负责与用户交互,另外几个线

程每个线程负责和一个网络主机通信。

以前我们讲过,main函数和信号处理函数是同一个进程地址空间中的多个控制流程,多线程也是如此,但是比信号处理函数更加灵活,信号处理函数的

控制流程只是在信号递达时产生,在处理完信号之后就结束,而多线程的控制流程可以长期并存,操作系统会在各线程之间调度和切换,就像在多个进

程之间调度和切换一样。由于同一进程的多个线程共享同一地址空间,因此Text Segment、Data Segment都是共享的,如果定义一个函数,在各线程

中都可以调用,如果定义一个全局变量,在各线程中都可以访问到,除此之外,各线程还共享以下进程资源和环境:

文件描述符表

每种信号的处理方式(SIG_IGN、SIG_DFL或者自定义的信号处理函数)

当前工作目录

用户id和组id

但有些资源是每个线程各有一份的:

线程id

上下文,包括各种寄存器的值、程序计数器和栈指针

栈空间

errno变量

信号屏蔽字

调度优先级

我们将要学习的线程库函数是由POSIX标准定义的,称为POSIX thread或者pthread。在Linux上线程函数位于libpthread共享库中,因此在编译时要加上-lpthread选项。

注:linux 2.6 以后的线程就是由用户态的pthread库实现的.使用pthread以后, 在用户看来, 每一个task_struct就对应一个线程, 而一组线程以及它们所共同引用的一组资源就是一个进程.在linux
2.6中, 内核有了线程组的概念, task_struct结构中增加了一个tgid(thread group id)字段. getpid(获取进程ID)系统调用返回的也是tast_struct中的tgid,
而tast_struct中的pid则由gettid系统调用来返回。
[align=left]
[/align]
[align=left]当线程停止/继续, 或者是收到一个致命信号时, 内核会将处理动作施加到整个线程组中。
[/align]
[align=left]比如程序a.out运行时,创建了一个线程。假设主线程的pid是10001、子线程是10002(它们的tgid都是10001)。这时如果你kill 10002,是可以把10001和10002这两个线程一起杀死的,尽管执行ps命令的时候根本看不到10002这个进程。如果你不知道linux线程背后的故事,肯定会觉得遇到灵异事件了。[/align]

三、pthread 系列函数

(一)

功能:创建一个新的线程

原型 int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void*), void *arg);

参数

thread:返回线程ID

attr:设置线程的属性,attr为NULL表示使用默认属性

start_routine:是个函数地址,线程启动后要执行的函数

arg:传给线程启动函数的参数

返回值:成功返回0;失败返回错误码

错误检查:

以前学过的系统函数都是成功返回0,失败返回-1,而错误号保存在全局变量errno中,而pthread库的函数都是通过返回值返回错误号,虽然每个线程也都有一个errno,但这是为了兼容其它函数接口而提供的,pthread库本身并不使用它,通过返回值返回错误码更加清晰。由于pthread_create的错误码不保存在errno中,因此不能直接用perror(3)打印错误信息,可以先用strerror(3)把错误号转换成错误信息再打印。

(二)

功能:线程终止

原型 void pthread_exit(void *value_ptr);

参数

value_ptr:value_ptr不要指向一个局部变量,因为当其它线程得到这个返回指针时线程函数已经退出了。

返回值:无返回值,跟进程一样,线程结束的时候无法返回到它的调用者(自身)

如果需要只终止某个线程而不终止整个进程,可以有三种方法:

1、从线程函数return。这种方法对主线程不适用,从main函数return相当于调用exit,而如果任意一个线程调用了exit或_exit,则整个进程的所有线程都终止。

2、一个线程可以调用pthread_cancel 终止同一进程中的另一个线程。

3、线程可以调用pthread_exit终止自己。

(三)

功能:等待线程结束

原型 int pthread_join(pthread_t thread, void **value_ptr);

参数

thread:线程ID

value_ptr:它指向一个指针,后者指向线程的返回值

返回值:成功返回0;失败返回错误码

当pthread_create 中的 start_routine返回时,这个线程就退出了,其它线程可以调用pthread_join得到start_routine的返回值,类似于父进程调用wait(2)得到子进程的退出状态。

调用该函数的线程将挂起等待,直到id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的,总结如下:

1、如果thread线程通过return返回,value_ptr所指向的单元里存放的是thread线程函数的返回值。

2、如果thread线程被别的线程调用pthread_cancel异常终止掉,value_ptr所指向的单元里存放的是常数PTHREAD_CANCELED。

3、如果thread线程是自己调用pthread_exit终止的,value_ptr所指向的单元存放的是传给pthread_exit的参数。

如果对thread线程的终止状态不感兴趣,可以传NULL给value_ptr参数。

(四)

功能:返回线程ID

原型 pthread_t pthread_self(void);

返回值:成功返回线程id

在Linux上,pthread_t类型是一个地址值,属于同一进程的多个线程调用getpid(2)可以得到相同的进程号,而调用pthread_self(3)得到的线程号各不相同。线程id只在当前进程中保证是唯一的,在不同的系统中pthread_t这个类型有不同的实现,它可能是一个整数值,也可能是一个结构体,也可能是一个地址,所以不能简单地当成整数用printf打印。

(五)

功能:取消一个执行中的线程

原型 int pthread_cancel(pthread_t thread);

参数

thread:线程ID

返回值:成功返回0;失败返回错误码

一个新创建的线程默认取消状态(cancelability state)是可取消的,取消类型( cancelability type)是同步的,即在某个可取消点( cancellation point,即在执行某些函数的时候)才会取消线程。具体可以man 一下。

相关函数 int pthread_setcancelstate(int state, int *oldstate);  int pthread_setcanceltype(int type, int *oldtype); 为保证一个事务型处理逻辑的完整可以使用这两个函数,如下举例,主线程创建完线程睡眠一阵调用pthread_cancel,test是thread_function。

 C++ Code 
1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

void *test(void *arg)

{

    for (int i = 0; i < 10; i++)

    {

        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);

        printf("start: %d; ", i);

        sleep(1);

        printf("end: %d\n", i);

        if (i > 7)

        {

            pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);

            pthread_testcancel();

        }

    }

    return (void *)0;

}
就算我们在完成一次完整逻辑后不立即改回 PTHREAD_CANCEL_ENABLE,就算后续循环再次调用 PTHREAD_CANCEL_DISABLE 设置,其 "未决状态" 依然会保留的。循环执行i=0~8 后,i=9时在sleep可取消点线程被中断。

(六)

功能:将一个线程分离

原型 int pthread_detach(pthread_t thread);

参数

thread:线程ID

返回值:成功返回0;失败返回错误码

一般情况下,线程终止后,其终止状态一直保留到其它线程调用pthread_join获取它的状态为止(僵线程)。但是线程也可以被置为detach状态,这样的线程一旦终止就立刻回收它占用的所有资源,而不保留终止状态。不能对一个已经处于detach状态的线程调用pthread_join,这样的调用将返回EINVAL。对一个尚未detach的线程调用pthread_join或pthread_detach都可以把该线程置为detach状态,也就是说,不能对同一线程调用两次pthread_join,或者如果已经对一个线程调用了pthread_detach就不能再调用pthread_join了。

这个函数既可以在主线程中调用,也可以在thread_function里面调用。

在主线程中通过线程属性也可以达到同样的效果,如下:

 C++ Code 
1

2

3

4

5

6

7

pthread_attr_t attr;

pthread_attr_init(&attr);

pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

pthread_t tid;

pthread_create(&tid, &attr, test, "a"); // test is thread_function
sleep(3);

pthread_attr_destroy(&attr);
下面写个程序走一下这些函数:

 C++ Code 
1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

#include<stdio.h>
#include<stdlib.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<sys/types.h>
#include<unistd.h>
#include<errno.h>
#include<pthread.h>
#include<string.h>

#define ERR_EXIT(m) \

    do { \

        perror(m); \

        exit(EXIT_FAILURE); \

    } while(0)

void *routine(void *arg)

{

    int i;

    for (i = 0; i < 20; i++)

    {

        printf("B");

        fflush(stdout);

        usleep(20);

        /*

            if (i == 3)

                pthread_exit("ABC");

            */

    }

    return "DEF";

}

int main(void)

{

    pthread_t tid;

    int ret;

    if ((ret = pthread_create(&tid, NULL, routine, NULL)) != 0)

    {

        fprintf(stderr, "pthread create: %s\n", strerror(ret));

        exit(EXIT_FAILURE);

    }

    int i;

    for (i = 0; i < 20; i++)

    {

        printf("A");

        fflush(stdout);

        usleep(20);

    }

    void *value;

    if ((ret = pthread_join(tid, &value)) != 0)

    {

        fprintf(stderr, "pthread create: %s\n", strerror(ret));

        exit(EXIT_FAILURE);

    }

    printf("\n");

    printf("return msg=%s\n", (char *)value);

    return 0;

}
创建一个线程,主线程打印A,新线程打印B,主线程调用pthread_join 等待新线程退出,打印退出值。

simba@ubuntu:~/Documents/code/linux_programming/UNP/pthread$ ./pthread_create 

ABAABABABABABABABABABABABABAABABBABABABB

return msg=DEF

在新线程中也可调用pthread_exit 退出。

四、简单的多线程服务器端程序

在将socket 编程的时候曾经使用fork 多进程的方式来实现并发,现在尝试使用多线程方式来实现:

 C++ Code 
1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>

#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>

#define ERR_EXIT(m) \

        do \

        { \

                perror(m); \

                exit(EXIT_FAILURE); \

        } while(0)

void echo_srv(int conn)

{

    char recvbuf[1024];

    while (1)

    {

        memset(recvbuf, 0, sizeof(recvbuf));

        int ret = read(conn, recvbuf, sizeof(recvbuf));

        if (ret == 0)

        {

            printf("client close\n");

            break;

        }

        else if (ret == -1)

            ERR_EXIT("read");

        fputs(recvbuf, stdout);

        write(conn, recvbuf, ret);

    }
   close(conn);

}

void *thread_routine(void *arg)

{

    /* 主线程没有调用pthread_join等待线程退出 */

    pthread_detach(pthread_self()); //剥离线程,避免产生僵线程
    /*int conn = (int)arg;*/

    int conn = *((int *)arg);

    free(arg);

    echo_srv(conn);

    printf("exiting thread ...\n");

    return NULL;

}

int main(void)

{

    int listenfd;

    if ((listenfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)

        ERR_EXIT("socket");

    struct sockaddr_in servaddr;

    memset(&servaddr, 0, sizeof(servaddr));

    servaddr.sin_family = AF_INET;

    servaddr.sin_port = htons(5188);

    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

    int on = 1;

    if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)

        ERR_EXIT("setsockopt");

    if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0)

        ERR_EXIT("bind");

    if (listen(listenfd, SOMAXCONN) < 0)

        ERR_EXIT("listen");

    struct sockaddr_in peeraddr;

    socklen_t peerlen = sizeof(peeraddr);

    int conn;

    while (1)

    {

        if ((conn = accept(listenfd, (struct sockaddr *)&peeraddr, &peerlen)) < 0)

            ERR_EXIT("accept");

        printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));

        pthread_t tid;

        //      int ret;
        /*pthread_create(&tid, NULL, thread_routine, (void*)&conn);*/ // race condition问题,竟态问题
        int *p = malloc(sizeof(int));

        *p = conn;

        pthread_create(&tid, NULL, thread_routine, p);

        /*

                if ((ret = pthread_create(&tid, NULL, thread_routine, (void*)conn)) != 0) //64位系统时指针不是4个字节,不可移植

                {

                    fprintf(stderr, "pthread_create:%s\n", strerror(ret));

                    exit(EXIT_FAILURE);

                }

        */

    }
程序逻辑并不复杂,一旦accept 返回一个已连接套接字,就创建一个新线程对其服务,在每个新线程thread_routine 中调用pthread_detach 剥离线程,我们的主线程不能调用pthread_join 等待这些新线程的退出,因为还要返回while 循环开头去在accept 中阻塞监听。

如果使用pthread_create(&tid, NULL, thread_routine, (void*)&conn); 存在的问题是如果accept 再次返回一个已连接套接字,而此时thread_routine 函数还没取走conn 时,可能会读取到已经被更改的conn 值。

如果使用  pthread_create(&tid, NULL, thread_routine, (void*)conn); 存在的问题是在64位系统中指针不是4个字节而是8个字节,即不可移植 性。

使用上述未被注释的做法,每次返回一个conn,就malloc 一块内存存放起来,在thread_routine 函数中去读取即可。

开多个客户端,可以看到正常服务。

后记:其实 pthread 系列函数也可以应用于进程间加锁,怎么应用到多进程场合呢,被多个进程共享呢?

很简单,首先需要设置互斥锁的进程间共享属性:

 C++ Code 
1

2

3

4

int pthread_mutexattr_setpshared(pthread_mutexattr_t *mattr, int pshared);

pthread_mutexattr_t mattr;

pthread_mutexattr_init(&mattr);

pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
其次,为了达到多进程共享的需要,互斥锁对象需要创建在共享内存中。
最后,需要注意的是,并不是所有Linux系统都支持这个特性,程序里需要检查是否定义了_POSIX_SHARED_MEMORY_OBJECTS宏,只有定义了才能用这种方式实现进程间互斥锁。

参考:

《linux c 编程一站式学习》

《UNP》

《APUE》

http://www.ibm.com/developerworks/cn/linux/l-cn-mthreadps/index.html
http://www.ibm.com/developerworks/cn/linux/kernel/l-thread/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: