您的位置:首页 > 其它

POSIX多线程程序设计(第4章:使用线程的几种方式)

2015-08-25 16:56 405 查看

1概述

线程编程模型有以下三种方式:

模型
说明
流水线
每个线程反复地在数据系列集上执行同一种操作,并把操作结果传递给下一步骤的其他线程,这就是“流水线”方式。即流水线上的线程对数据执行同一种操作,如简单的加1,再把数据的操作结果传递给下一个线程,直到流水线上的最后一个线程。
工作组
每个线程在自己的数据上执行操作,工作组中的线程可能执行相同或不同的操作,但一定独立执行。即线程作为工作组的成员,为完成一项工作而分工进行。如查找一个目录下包含某个字符串的文件有几个,可以让工作组的每个成员线程分别去查一个文件。
客户端/服务器
一个客户为每一件工作向一个独立的服务器提交请求(如读或写),由服务器来处理这些(如读或写)请求。

2流水线



#include "errors.h"
#include <pthread.h>
#include <sys/prctl.h>

/*线程编程模型: 流水线模型*/

/*流水线成员线程处理的数据*/
typedef struct stage_tag
{
pthread_mutex_t mutex;/*线程数据互斥量*/
pthread_cond_t avail;/*线程是否可用的条件变量*/
pthread_cond_t ready;
pthread_t thread;/*线程ID*/
int data_ready;/*1:表示数据已准备好;0:表示数据处理完成或未准备好*/
long data;/*线程数据*/
struct stage_tag *next;
}stage_t;

/*数据集系列*/
typedef struct pipe_tag
{
pthread_mutex_t mutex;/*线程链表互斥量*/
stage_t *head;/*指向流水线第一个线程要处理的数据*/
stage_t *tail;/*指向最后一个数据,即最终的数据处理结果*/
int stages;/*线程总的个数*/
int active;/*流水线上处理的数据的个数*/
}pipe_t;

int pipe_send(stage_t *stage, long data)
{
int status;

pthread_mutex_lock(&stage->mutex);

#if 1 /*线程上一个数据还未处理完成时,需要等待*/
while (stage->data_ready)
{
status = pthread_cond_wait(&stage->ready, &stage->mutex);
if (status != 0)
{
pthread_mutex_unlock(&stage->mutex);
return status;
}
}
#endif
stage->data = data;
stage->data_ready = 1;
/*数据已准备好,通知等待的线程开始处理数据*/
status = pthread_cond_signal(&stage->avail);
if (status != 0)
{
pthread_mutex_unlock(&stage->mutex);
return status;
}

status = pthread_mutex_unlock(&stage->mutex);

return status;
}

void *pipe_stage(void *arg)
{
int status;
stage_t *stage = (stage_t *)arg;
stage_t *next_stage = stage->next;

/*线程命名*/
#if 0
static int i = 0;
char thread_name[20];
sprintf(thread_name, "thread_%d", i++);
prctl(PR_SET_NAME, ( unsigned long )thread_name);
#endif
status = pthread_mutex_lock(&stage->mutex);
if (status != 0)
{
err_abort(status, "lock pipe stage");
}

while (1)
{
while (stage->data_ready != 1)
{
status = pthread_cond_wait(&stage->avail, &stage->mutex);
if (status != 0)
{
err_abort(status, "wait for previous stage");
}
/*将数据发送给下一个线程处理*/
pipe_send(next_stage, stage->data + 1);
stage->data_ready = 0;
#if 1 /*线程对数据的处理已经完成,通知等待的线程*/
status = pthread_cond_signal(&stage->ready);
if (status != 0)
{
err_abort(status, "wake next stage");
}
#endif
}
}
}

int pipe_create(pipe_t *pipe, int stages)
{
int status;
int pipe_index;
stage_t *new_stage, *stage;
stage_t **link = &pipe->head;

status = pthread_mutex_init(&pipe->mutex, NULL);
if (status != 0)
{
err_abort(status, "init pipe mutex");
}

pipe->stages = stages;/*线程个数*/
pipe->active = 0;

/*生成线程处理的数据集系列,多创建一个额外的数据stage*/
for (pipe_index = 0; pipe_index <= stages; pipe_index++)
{
new_stage = (stage_t *)malloc(sizeof(stage_t));
if (new_stage == NULL)
{
errno_abort("allocate stage");
}
status = pthread_mutex_init(&new_stage->mutex, NULL);
if (status != 0)
{
err_abort(status, "init stage mutex");
}

status = pthread_cond_init(&new_stage->avail, NULL);
if (status != 0)
{
err_abort(status, "init avail cond");
}

status = pthread_cond_init(&new_stage->ready, NULL);
if (status != 0)
{
err_abort(status, "init ready cond");
}

new_stage->data_ready = 0;
*link = new_stage;
link = &new_stage->next;
}

*link = (stage_t *)NULL;
pipe->tail = new_stage;/*将第11个stage赋值给pipe的tail*/

/*创建10个线程,第10个线程还处理了最后的stage*/
for (stage = pipe->head; stage->next != NULL; stage = stage->next)
{
status = pthread_create(&stage->thread, NULL, pipe_stage, (void *)stage);
if (status != 0)
{
err_abort(status, "create pipe stage");
}
}

return 0;
}

int pipe_start(pipe_t *pipe, long value)
{
int status;

pthread_mutex_lock(&pipe->mutex);
pipe->active++;/*流水线处理了几个数据*/
pthread_mutex_unlock(&pipe->mutex);

pipe_send(pipe->head, value);

return 0;
}

int pipe_result(pipe_t *pipe, long *result)
{
stage_t *tail = pipe->tail;
long value;
int empty = 0;
int status;
pthread_mutex_lock(&pipe->mutex);

if (pipe->active <= 0)
empty = 1;
else
pipe->active--;

pthread_mutex_unlock(&pipe->mutex);

/*流水线上为空则直接退出*/
if (empty)
return 0;

pthread_mutex_lock(&tail->mutex);
/*等待最后一个数据stage处理完*/
while (!tail->data_ready)
pthread_cond_wait(&tail->avail, &tail->mutex);

*result = tail->data;
tail->data_ready = 0;
pthread_cond_signal(&tail->ready);

pthread_mutex_unlock(&tail->mutex);

return 1;
}

int main(int argc, char *argv[])
{
pipe_t my_pipe;
long value, result;

int status;
char line[128];

pipe_create(&my_pipe, 10);
printf("Enter integer values, or \"=\" for next result\n");

while (1)
{
printf("Data> ");
if (fgets(line, sizeof(line), stdin) == NULL)
exit(0);

if (strlen(line) <= 1)
continue;

if (strlen(line) <= 2 && line[0] == '=')
{
if (pipe_result(&my_pipe, &result))
printf("Result is %ld\n", result);
else
printf("Pipe is empty\n");
}
else
{
if (sscanf(line, "%ld", &value) < 1)
fprintf(stderr, "Enter an integer value\n");
else
pipe_start(&my_pipe, value);
}
}

return 0;
}


3工作组



/*工作组模型*/
#include <sys/types.h>
#include <pthread.h>
#include <sys/stat.h>
#include <dirent.h>
#include "errors.h"

#define CREW_SIZE 4

typedef struct work_tag
{
struct work_tag *next;
char *path;
char *string;
}work_t, *work_p;

typedef struct worker_tag
{
int index;
pthread_t thread;
struct crew_tag *crew;
}worker_t, *worker_p;

typedef struct crew_tag
{
int crew_size;
worker_t crew[CREW_SIZE];
long work_count;
work_t *first, *last;
pthread_mutex_t mutex;
pthread_cond_t done;
pthread_cond_t go;
}crew_t, *crew_p;

size_t path_max;
size_t name_max;

void *worker_routine(void *arg)
{
worker_p mine = (worker_t *)arg;
crew_p crew = mine->crew;
work_p work,new_work;
struct stat filestat;
struct dirent *entry;
int status;

entry = (struct dirent *)malloc(sizeof(struct dirent) + name_max);
if (entry == NULL)
errno_abort("allocating dirent");

pthread_mutex_lock(&crew->mutex);

/*队列中没有work,需要等待*/
while (crew->work_count == 0)
{
pthread_cond_wait(&crew->go, &crew->mutex);
}
pthread_mutex_unlock(&crew->mutex);

while (1)
{
pthread_mutex_lock(&crew->mutex);

/*没有work,需要等待*/
while (crew->first == NULL)
{
pthread_cond_wait(&crew->go, &crew->mutex);
}

work = crew->first;
crew->first = work->next;
if (crew->first == NULL)
{
crew->last = NULL;
}

pthread_mutex_unlock(&crew->mutex);

/*降序遍历目录层次结构*/
status = lstat(work->path, &filestat);

/*符号链接*/
if (S_ISLNK(filestat.st_mode))
{
printf("thread %d: %s is a link, skipping.\n", mine->index, work->path);
}
/*目录文件*/
else if (S_ISDIR(filestat.st_mode))
{
DIR *dir;
struct dirent *result;

/*打开目录*/
dir = opendir(work->path);
if (dir == NULL)
{
fprintf(stderr, "Unable to open directory %s: %d (%s)\n", work->path, errno, strerror(errno));
continue;
}

while (1)
{
/*readdir_r线程安全函数,result返回目录项指针,entry返回下一个目录项*/
status = readdir_r (dir, entry, &result);
if (status != 0)
{
fprintf (stderr,"Unable to read directory %s: %d (%s)\n", work->path, status, strerror (status));
break;
}
if (result == NULL)
break;

if (strcmp (entry->d_name, ".") == 0)
continue;
if (strcmp (entry->d_name, "..") == 0)
continue;
new_work = (work_p)malloc(sizeof (work_t));
if (new_work == NULL)
errno_abort ("Unable to allocate space");
new_work->path = (char*)malloc(path_max);
if (new_work->path == NULL)
errno_abort ("Unable to allocate path");

strcpy (new_work->path, work->path);
strcat (new_work->path, "/");
strcat (new_work->path, entry->d_name);
new_work->string = work->string;
new_work->next = NULL;

status = pthread_mutex_lock (&crew->mutex);
if (status != 0)
err_abort (status, "Lock mutex");

/*将new_work插入first队列的尾端,并将last指向最后一个new_work*/
if (crew->first == NULL)
{
crew->first = new_work;
crew->last = new_work;
}
else
{
crew->last->next = new_work;
crew->last = new_work;
}

crew->work_count++;
pthread_cond_signal(&crew->go);
pthread_mutex_unlock(&crew->mutex);

}

closedir (dir);
}
/*普通文件*/
else if (S_ISREG(filestat.st_mode))
{
FILE *search;
char buffer[256], *bufptr, *search_ptr;
search = fopen (work->path, "r");
if (search == NULL)
{
fprintf(stderr, "Unable to open %s: %d (%s)\n", work->path, errno, strerror (errno));
}
else
{
while (1)
{
bufptr = fgets(buffer, sizeof (buffer), search);
if (bufptr == NULL)
{
if (feof (search))
break;
if (ferror (search))
{
fprintf(stderr, "Unable to read %s: %d (%s)\n", work->path, errno, strerror(errno));
break;
}
}

search_ptr = strstr(buffer, work->string);
if (search_ptr != NULL)
{
/*文件锁*/
flockfile (stdout);
printf("Thread %d found \"%s\" in %s\n", mine->index, work->string, work->path);
#if 0
printf ("%s\n", buffer);
#endif
/**/
funlockfile (stdout);
break;
}
}
fclose (search);
}
}
else
{
fprintf(stderr, "Thread %d: %s is type %o (%s))\n", mine->index, work->path, filestat.st_mode & S_IFMT,
(S_ISFIFO (filestat.st_mode) ? "FIFO" :
(S_ISCHR (filestat.st_mode) ? "CHR" :
(S_ISBLK (filestat.st_mode) ? "BLK" :
(S_ISSOCK (filestat.st_mode) ? "SOCK" : "unknown")))));
}

free(work->path);
free (work);

pthread_mutex_lock (&crew->mutex);

crew->work_count--;

if (crew->work_count <= 0)
{
pthread_cond_broadcast (&crew->done);
pthread_mutex_unlock (&crew->mutex);
break;
}

pthread_mutex_unlock (&crew->mutex);

}

free (entry);
return NULL;
}

int crew_start(crew_p crew, char *filepath, char *search)
{
work_p request;
int status;

pthread_mutex_lock(&crew->mutex);

while (crew->work_count > 0)
{
pthread_cond_wait(&crew->done, &crew->mutex);
}

errno = 0;
/*获取相对路径名的最大字节数,包括终止符NULL*/
path_max = pathconf(filepath, _PC_PATH_MAX);
if (path_max == -1)
{
if (errno == 0)
path_max = 1024;/*返错且不设置errno,则说明无限制*/
else
errno_abort("Unable to get PATH_MAX");
}

errno = 0;
/*获取文件名的最大字节数,不包括终止符NULL*/
name_max = pathconf(filepath, _PC_NAME_MAX);/**/
if (name_max == -1)
{
if (errno == 0)
name_max = 256;
else
errno_abort("Unable to get NAME_MAX");
}

name_max++;/*加上终止符的字节*/

request = (work_p)malloc(sizeof(work_t));
if (request == NULL)
errno_abort("unable to allocate request");

request->path = (char *)malloc(path_max);
if (request->path == NULL)
errno_abort("unable to allocate path");
strcpy(request->path, filepath);
request->string = search;
request->next = NULL;

/*将request插入到first的尾端*/
if (crew->first == NULL)
{
crew->first = request;
crew->last = request;
}
else
{
crew->last->next = request;
crew->last = request;
}

crew->work_count++;

status = pthread_cond_signal(&crew->go);
if (status != 0)
{
free(crew->first);
crew->first = NULL;
crew->work_count = 0;
pthread_mutex_unlock(&crew->mutex);

return status;
}

while (crew->work_count > 0)
{
pthread_cond_wait(&crew->done, &crew->mutex);
}

pthread_mutex_unlock(&crew->mutex);

return 0;
}

/*创建工作组*/
int crew_create(crew_t *crew, int crew_size)
{
int crew_index;
int status;

if (crew_size > CREW_SIZE)
return EINVAL;

crew->crew_size = crew_size;
crew->work_count = 0;
crew->first = NULL;
crew->last = NULL;

status = pthread_mutex_init(&crew->mutex, NULL);
if (status != 0)
return status;
status = pthread_cond_init(&crew->done, NULL);
if (status != 0)
return status;
status = pthread_cond_init(&crew->go, NULL);
if (status != 0)
return status;

/*创建四个线程*/
for (crew_index = 0; crew_index < CREW_SIZE; crew_index++)
{
crew->crew[crew_index].index = crew_index;
crew->crew[crew_index].crew = crew;/*回指*/
status = pthread_create(&crew->crew[crew_index].thread, NULL, worker_routine, (void *)&crew->crew[crew_index]);
if (status != 0)
err_abort(status, "create worker");
}

return 0;
}

int main(int argc, char *argv[])
{
crew_t my_crew;
char line[128], *next;
int status;

if (argc < 3)
{
fprintf(stderr, "Usage: %s string path\n", argv[0]);
return -1;
}

/*创建工作组*/
status = crew_create(&my_crew, CREW_SIZE);
if (status != 0)
err_abort(status, "create crew");

/*工作组开始运行*/
status = crew_start(&my_crew, argv[2], argv[1]);
if (status != 0)
err_abort(status, "start crew");

return 0;
}

4客户端/服务器



#include <pthread.h>
#include <math.h>
#include "errors.h"

#define CLIENT_THREADS 4 /* Number of clients */

#define REQ_READ 1 /* Read with prompt */
#define REQ_WRITE 2 /* Write */
#define REQ_QUIT 3 /* Quit server */

typedef struct request_tag {
struct request_tag *next; /* Link to next */
int operation; /* Function code */
int synchronous; /* Non-zero if synchronous */
int done_flag; /* Predicate for wait */
pthread_cond_t done; /* Wait for completion */
char prompt[32]; /* Prompt string for reads */
char text[128]; /* Read/write text */
} request_t;

typedef struct tty_server_tag {
request_t *first;
request_t *last;
int running;
pthread_mutex_t mutex;
pthread_cond_t request;
} tty_server_t;

tty_server_t tty_server = {
NULL, NULL, 0,
PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};

int client_threads;
pthread_mutex_t client_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t clients_done = PTHREAD_COND_INITIALIZER;

void *tty_server_routine (void *arg)
{
static pthread_mutex_t prompt_mutex = PTHREAD_MUTEX_INITIALIZER;
request_t *request;
int operation, len;
int status;

while (1) {
status = pthread_mutex_lock (&tty_server.mutex);
if (status != 0)
err_abort (status, "Lock server mutex");

while (tty_server.first == NULL) {
status = pthread_cond_wait (
&tty_server.request, &tty_server.mutex);
if (status != 0)
err_abort (status, "Wait for request");
}
request = tty_server.first;
tty_server.first = request->next;
if (tty_server.first == NULL)
tty_server.last = NULL;
status = pthread_mutex_unlock (&tty_server.mutex);
if (status != 0)
err_abort (status, "Unlock server mutex");

operation = request->operation;
switch (operation) {
case REQ_QUIT:
break;
case REQ_READ:
if (strlen (request->prompt) > 0)
printf (request->prompt);
if (fgets (request->text, 128, stdin) == NULL)
{
request->text[0] = '\0';
}

len = strlen (request->text);
if (len > 0 && request->text[len-1] == '\n')
{
request->text[len-1] = '\0';
}
break;
case REQ_WRITE:
puts (request->text);
break;
default:
break;
}
if (request->synchronous) {
status = pthread_mutex_lock (&tty_server.mutex);
if (status != 0)
err_abort (status, "Lock server mutex");
request->done_flag = 1;
status = pthread_cond_signal (&request->done);
if (status != 0)
err_abort (status, "Signal server condition");
status = pthread_mutex_unlock (&tty_server.mutex);
if (status != 0)
err_abort (status, "Unlock server mutex");
} else
free (request);
if (operation == REQ_QUIT)
break;
}
return NULL;
}

void tty_server_request (
int operation,
int sync,
const char *prompt,
char *string)
{
request_t *request;
int status;

status = pthread_mutex_lock (&tty_server.mutex);
if (status != 0)
err_abort (status, "Lock server mutex");
if (!tty_server.running) {
pthread_t thread;
pthread_attr_t detached_attr;

status = pthread_attr_init (&detached_attr);
if (status != 0)
err_abort (status, "Init attributes object");
status = pthread_attr_setdetachstate (
&detached_attr, PTHREAD_CREATE_DETACHED);
if (status != 0)
err_abort (status, "Set detach state");
tty_server.running = 1;
status = pthread_create (&thread, &detached_attr,
tty_server_routine, NULL);
if (status != 0)
err_abort (status, "Create server");

pthread_attr_destroy (&detached_attr);
}

request = (request_t*)malloc (sizeof (request_t));
if (request == NULL)
errno_abort ("Allocate request");
request->next = NULL;
request->operation = operation;
request->synchronous = sync;
if (sync) {
request->done_flag = 0;
status = pthread_cond_init (&request->done, NULL);
if (status != 0)
err_abort (status, "Init request condition");
}
if (prompt != NULL)
strncpy (request->prompt, prompt, 32);
else
request->prompt[0] = '\0';
if (operation == REQ_WRITE && string != NULL)
strncpy (request->text, string, 128);
else
request->text[0] = '\0';

if (tty_server.first == NULL)
{
tty_server.first = request;
tty_server.last = request;
}
else
{
(tty_server.last)->next = request;
tty_server.last = request;
}

status = pthread_cond_signal (&tty_server.request);
if (status != 0)
err_abort (status, "Wake server");

if (sync) {
while (!request->done_flag) {
status = pthread_cond_wait (
&request->done, &tty_server.mutex);
if (status != 0)
err_abort (status, "Wait for sync request");
}
if (operation == REQ_READ) {
if (strlen (request->text) > 0)
strcpy (string, request->text);
else
string[0] = '\0';
}
status = pthread_cond_destroy (&request->done);
if (status != 0)
err_abort (status, "Destroy request condition");
free (request);
}
status = pthread_mutex_unlock (&tty_server.mutex);
if (status != 0)
err_abort (status, "Unlock mutex");
}

void *client_routine (void *arg)
{
int my_number = (int)arg, loops;
char prompt[32];
char string[128], formatted[128];
int status;

sprintf (prompt, "Client %d> ", my_number);
while (1) {
/*向服务器线程发送读请求*/
tty_server_request (REQ_READ, 1, prompt, string);
if (strlen(string) == 0)
{
printf("enter nothing ...\n");
break;
}
for (loops = 0; loops < 4; loops++) {
sprintf(formatted, "(%d#%d) %s", my_number, loops, string);
/*向服务器线程发送写请求*/
tty_server_request (REQ_WRITE, 0, NULL, formatted);
sleep(1);
}
}
status = pthread_mutex_lock (&client_mutex);
if (status != 0)
err_abort (status, "Lock client mutex");
client_threads--;
if (client_threads <= 0) {
status = pthread_cond_signal (&clients_done);
if (status != 0)
err_abort (status, "Signal clients done");
}
status = pthread_mutex_unlock (&client_mutex);
if (status != 0)
err_abort (status, "Unlock client mutex");
return NULL;
}

int main (int argc, char *argv[])
{
pthread_t thread;
int count;
int status;

/*创建4个客户线程*/
client_threads = CLIENT_THREADS;
for (count = 0; count < client_threads; count++) {
status = pthread_create (&thread, NULL,
client_routine, (void*)count);
if (status != 0)
err_abort (status, "Create client thread");
}
status = pthread_mutex_lock (&client_mutex);
if (status != 0)
err_abort (status, "Lock client mutex");
while (client_threads > 0) {
status = pthread_cond_wait (&clients_done, &client_mutex);
if (status != 0)
err_abort (status, "Wait for clients to finish");
}
status = pthread_mutex_unlock (&client_mutex);
if (status != 0)
err_abort (status, "Unlock client mutex");
printf ("All clients done\n");
tty_server_request (REQ_QUIT, 1, NULL, NULL);
return 0;
}




内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: