用c实现一个阻塞的消息队列
2018-01-10 10:44
609 查看
该消息队列的实现,完全是符合posix标准。
实现了
1.读时,队列空,会阻塞
2.写时,队列满,会阻塞
为了保证线程安全,采用了互斥量,为了阻塞一定时间,采用了条
4000
件变量,二者一起用,实现了一个简单消息队列
simple_queue.c
simple_queue.h
main.c
在linux下面编译
再运行
望大神指教
实现了
1.读时,队列空,会阻塞
2.写时,队列满,会阻塞
为了保证线程安全,采用了互斥量,为了阻塞一定时间,采用了条
4000
件变量,二者一起用,实现了一个简单消息队列
simple_queue.c
#include<stdio.h> #include<stdlib.h> #include<string.h> #include<pthread.h> #include<sys/time.h> #include"simple_queue.h" pthread_cond_t msg_cond = PTHREAD_COND_INITIALIZER; simple_queue* create_simple_queue(const char* queue_name, int queue_length, int queue_type) { simple_queue *this = NULL; if (NULL == queue_name || 0 == queue_length) { printf("[%s] param is error\n", __FUNCTION__); return NULL; } this = (simple_queue*)malloc(sizeof(simple_queue) + queue_length * sizeof(void*)); if (NULL != this) { this->front = 0; this->rear = 0; this->length = queue_length; this->queue_type = queue_type; if (0 != pthread_mutex_init(&(this->data_mutex), NULL)) { printf("[%s]pthread_mutex_init failed!\n", __FUNCTION__); free(this); this = NULL; return NULL; } strcpy(this->queue_name, queue_name); } else { printf("[%s]malloc is failed!\n", __FUNCTION__); return NULL; } return this; } /* */ queue_status is_full_queue(simple_queue* p_queue) { queue_status ret = QUEUE_IS_NORMAL; do { if (NULL == p_queue) { printf("[%s] param is error\n", __FUNCTION__); ret = QUEUE_NO_EXIST; break; } if (p_queue->front == ((p_queue->rear + 1) % (p_queue->length))) { printf("[%s] queue is full\n", __FUNCTION__); ret = QUEUE_IS_FULL; break; } }while(0); return ret; } /* */ queue_status is_empty_queue(simple_queue* p_queue) { queue_status ret = QUEUE_IS_NORMAL; do { if (NULL == p_queue) { printf("[%s] param is error\n", __FUNCTION__); ret = QUEUE_NO_EXIST;; break; } if (p_queue->front == p_queue->rear) { printf("[%s] queue is empty\n", __FUNCTION__); ret = QUEUE_IS_EMPTY; break; } }while(0); return ret; } /* */ cntl_queue_ret push_simple_queue(simple_queue* p_queue, void* data) { int w_cursor = 0; if(NULL == p_queue || NULL == data) { printf("[%s] param is error\n", __FUNCTION__); return CNTL_QUEUE_PARAM_ERROR; } pthread_mutex_lock(&(p_queue->data_mutex)); w_cursor = (p_queue->rear + 1)%p_queue->length; if (w_cursor == p_queue->front) { struct timeval now; struct timespec tsp; gettimeofday(&now, NULL); tsp.tv_sec = now.tv_sec + 5; tsp.tv_nsec = now.tv_usec * 1000; if (0 != pthread_cond_timedwait(&msg_cond, &(p_queue->data_mutex), &tsp))//队列满,等待消息被抛出,如果5秒内,没有消息被抛出,就返回 { printf("[%s]: queue is full\n", __FUNCTION__); pthread_mutex_unlock(&(p_queue->data_mutex)); return CNTL_QUEUE_TIMEOUT; } w_cursor = (p_queue->rear + 1)%p_queue->length; } p_queue->data[p_queue->rear] = data; p_queue->rear = w_cursor; pthread_mutex_unlock(&(p_queue->data_mutex)); pthread_cond_signal(&msg_cond); //pthread_cond_broadcast(&msg_cond); return CNTL_QUEUE_SUCCESS; } /* */ cntl_queue_ret pop_simple_queue(simple_queue* p_queue, void** data) { if(NULL == p_queue) { printf("[%s] param is error\n", __FUNCTION__); return CNTL_QUEUE_PARAM_ERROR; } pthread_mutex_lock(&(p_queue->data_mutex)); if (p_queue->front == p_queue->rear) { struct timeval now; struct timespec tsp; gettimeofday(&now, NULL); tsp.tv_sec = now.tv_sec + 5; tsp.tv_nsec = now.tv_usec * 1000; if (0 != pthread_cond_timedwait(&msg_cond, &(p_queue->data_mutex), &tsp))//队列空,等待消息来临,如果5秒内,没有消息来,就返回 { printf("[%s]: queue is empty\n", __FUNCTION__); pthread_mutex_unlock(&(p_queue->data_mutex)); return CNTL_QUEUE_TIMEOUT; } } *data = p_queue->data[p_queue->front]; p_queue->front = (p_queue->front + 1)%p_queue->length; pthread_mutex_unlock(&(p_queue->data_mutex)); pthread_cond_signal(&msg_cond); //pthread_cond_broadcast(&msg_cond); return CNTL_QUEUE_SUCCESS; } cntl_queue_ret destroy_simple_queue(simple_queue* p_queue) { cntl_queue_ret ret = CNTL_QUEUE_SUCCESS; if(NULL == p_queue) { printf("[%s] param is error\n", __FUNCTION__); ret = CNTL_QUEUE_PARAM_ERROR; } else { pthread_mutex_destroy(&(p_queue->data_mutex)); pthread_cond_destroy(&msg_cond); while (p_queue->front != p_queue->rear)//删除队列中残留的消息 { free(p_queue->data[p_queue->front]); p_queue->front = (p_queue->front + 1)%p_queue->length; } free(p_queue); p_queue = NULL; } return ret; }
simple_queue.h
#ifndef _SIMPLE_QUEUE_H__ #define _SIMPLE_QUEUE_H__ #define MQ_NAME "test" #define MQ_FLAG_BLOCK QUEUE_BLOCK #define MQ_FLAG_NO_BLOCK QUEUE_NO_BLOCK #define MQ_LENGTH_MAX 10 #define MQ_SIZE_MAX 512 typedef struct _simple_queue { int front; int rear; int length; int queue_type; pthread_mutex_t data_mutex; char queue_name[15]; void *data[0]; }simple_queue; typedef struct _simple_queue_buf { int msg_type; int msg_prio; char msg_buf[50]; //char* msg_buf; }queue_buf; typedef enum _queue_type { QUEUE_BLOCK = 0, QUEUE_NO_BLOCK, }queue_type; typedef enum _queue_status { QUEUE_IS_NORMAL = 0, QUEUE_NO_EXIST, QUEUE_IS_FULL, QUEUE_IS_EMPTY, }queue_status; typedef enum _cntl_queue_ret { CNTL_QUEUE_SUCCESS = 0, CNTL_QUEUE_FAIL, CNTL_QUEUE_TIMEOUT, CNTL_QUEUE_PARAM_ERROR, }cntl_queue_ret; simple_queue* create_simple_queue(const char* queue_name, int queue_length, int queue_type); queue_status is_full_queue(simple_queue* p_queue); queue_status is_empty_queue(simple_queue* p_queue); cntl_queue_ret push_simple_queue(simple_queue* p_queue, void* data); cntl_queue_ret pop_simple_queue(simple_queue* p_queue, void** data); cntl_queue_ret destroy_simple_queue(simple_queue* p_queue); #endif
main.c
#include<stdio.h> #include<pthread.h> //#include<errno.h> #include<string.h> #include<stdlib.h> #include"simple_queue.h" void* send_msg_thread(void* arg) { #if 1 queue_buf* send_buf = NULL; send_buf = (queue_buf*)malloc(sizeof(queue_buf)); send_buf->msg_type = 9; send_buf->msg_prio = 6; strcpy(send_buf->msg_buf, "helloworld"); printf("first1: rear =%d font =%d\n", ((simple_queue*)arg)->rear, ((simple_queue*)arg)->front); printf("send_buf = %p\n", send_buf); if (push_simple_queue((simple_queue*)arg, (void*)send_buf) < 0) { printf("[%s]: push_simple_queue failed!\n", __FUNCTION__); return NULL; } printf("first2: rear =%d font =%d\n", ((simple_queue*)arg)->rear, ((simple_queue*)arg)->front); #if 1 queue_buf* send_buf1 = NULL; send_buf1 = (queue_buf*)malloc(sizeof(queue_buf)); send_buf1->msg_type = 9; send_buf1->msg_prio = 6; strcpy(send_buf1->msg_buf, "byebye"); printf("first1: rear =%d font =%d\n", ((simple_queue*)arg)->rear, ((simple_queue*)arg)->front); printf("send_buf1 = %p\n", send_buf1); if (push_simple_queue((simple_queue*)arg, (void*)send_buf1) < 0) { printf("[%s]: push_simple_queue\n", __FUNCTION__); return NULL; } printf("first2: rear =%d font =%d\n", ((simple_queue*)arg)->rear, ((simple_queue*)arg)->front); #endif #if 0 queue_buf* recv_buf = NULL; printf("recv_buf = %p\n", recv_buf); printf("seconde1 rear =%d font =%d\n", ((simple_queue*)arg)->rear, ((simple_queue*)arg)->front); if (CNTL_QUEUE_SUCCESS != pop_simple_queue((simple_queue*)arg, (void**)&recv_buf)) { printf("[%s]: push_simple_queue failed!\n", __FUNCTION__); return NULL; } printf("recv_buf = %p\n", recv_buf); printf("type = %d, prio = %d, recv_buf = %s\n", recv_buf->msg_type, recv_buf->msg_prio, recv_buf->msg_buf); printf("seconde2: rear =%d font =%d\n", ((simple_queue*)arg)->rear, ((simple_queue*)arg)->front); free(recv_buf); #endif #endif return NULL; } void* recv_msg_thread(void* arg) { #if 1 queue_buf* recv_buf = NULL; //usleep(500); printf("recv_buf = %p\n", recv_buf); printf("seconde1 rear =%d font =%d\n", ((simple_queue*)arg)->rear, ((simple_queue*)arg)->front); if (CNTL_QUEUE_SUCCESS != pop_simple_queue((simple_queue*)arg, (void**)&recv_buf)) { printf("[%s]: pop_simple_queue failed!\n", __FUNCTION__); return NULL; } printf("recv_buf = %p\n", recv_buf); printf("type = %d, prio = %d, recv_buf = %s\n", recv_buf->msg_type, recv_buf->msg_prio, recv_buf->msg_buf); printf("seconde2: rear =%d font =%d\n", ((simple_queue*)arg)->rear, ((simple_queue*)arg)->front); free(recv_buf); recv_buf = NULL; //queue_buf* recv_buf = NULL; //usleep(500); #if 1 printf("recv_buf = %p\n", recv_buf); printf("seconde1 rear =%d font =%d\n", ((simple_queue*)arg)->rear, ((simple_queue*)arg)->front); if (CNTL_QUEUE_SUCCESS != pop_simple_queue((simple_queue*)arg, (void**)&recv_buf)) { printf("[%s]: pop_simple_queue failed!\n", __FUNCTION__); return NULL; } printf("recv_buf = %p\n", recv_buf); printf("type = %d, prio = %d, recv_buf = %s\n", recv_buf->msg_type, recv_buf->msg_prio, recv_buf->msg_buf); printf("seconde2: rear =%d font =%d\n", ((simple_queue*)arg)->rear, ((simple_queue*)arg)->front); free(recv_buf); recv_buf = NULL; printf("recv_buf = %p\n", recv_buf); #endif #if 0 queue_buf* send_buf1 = NULL; send_buf1 = (queue_buf*)malloc(sizeof(queue_buf)); send_buf1->msg_type = 9; send_buf1->msg_prio = 6; strcpy(send_buf1->msg_buf, "byebye"); printf("first1: rear =%d font =%d\n", ((simple_queue*)arg)->rear, ((simple_queue*)arg)->front); printf("send_buf1 = %p\n", send_buf1); if (push_simple_queue((simple_queue*)arg, (void*)send_buf1) < 0) { printf("[%s]: push_simple_queue\n", __FUNCTION__); return NULL; } printf("first2: rear =%d font =%d\n", ((simple_queue*)arg)->rear, ((simple_queue*)arg)->front); #endif #endif return NULL; } int main(int argc, char* argv[]) { int ret = 0; pthread_t send_thread_id = 0; pthread_t recv_thread_id = 0; simple_queue* msg_queue = NULL; #if 1 msg_queue = create_simple_queue(MQ_NAME, MQ_LENGTH_MAX, MQ_FLAG_BLOCK); #else msg_queue = create_simple_queue(MQ_NAME, MQ_LENGTH_MAX, MQ_FLAG_NO_BLOCK); #endif if (NULL == msg_queue) { printf("[%s]: create simple queue failed!\n", __FUNCTION__); return -1; } ret = pthread_create(&send_thread_id, NULL, send_msg_thread, (void*)msg_queue); if (0 != ret) { printf("[%s]: create send thread failed!\n", __FUNCTION__); return -1; } ret = pthread_create(&recv_thread_id, NULL, recv_msg_thread, (void*)msg_queue); if (0 != ret) { printf("[%s]: create recv thread failed!\n", __FUNCTION__); return -1; } printf("begin join\n"); pthread_join(send_thread_id, NULL); pthread_join(recv_thread_id, NULL); printf("end join\n"); ret = destroy_simple_queue(msg_queue); if (CNTL_QUEUE_SUCCESS != ret) { printf("[%s]: destroy simple queue failed!\n", __FUNCTION__); return -1; } return 0; }
在linux下面编译
再运行
望大神指教
相关文章推荐
- nodejs一个函数实现消息队列中间件
- 【JAVA】简单实现一个阻塞任务队列
- 一个Redis消息队列实现
- JDK源码分析之主要阻塞队列实现类ArrayBlockingQueue -- java消息队列/java并发编程/阻塞队列
- 用一个消息队列(System V)实现客户端-服务器端
- 消息队列实现从一个进程向另一个进程发送一个数据块的方法
- 用redis阻塞队列,实现消息传递
- Java编程的逻辑 (61) - 内存映射文件及其应用 - 实现一个简单的消息队列
- 一个简单的阻塞队列实现
- 使用 ReentrantLock 和 Condition 实现一个阻塞队列
- 12-2-多condition实现可阻塞消息队列
- 利用主线程与子线程间的消息通讯,实现任务处理队列.子线程中创建不会阻塞执行的窗口
- javaweb简单实现前台请求的非阻塞消息队列(ConcurrentLinkedQueue)
- 一个消息队列类的实现C++
- 聊聊高并发(十四)理解Java中的管程,条件队列,Condition以及实现一个阻塞队列
- 基于Redis实现分布式消息队列
- 使用两个队列实现一个栈
- 分享一个c#写的开源分布式消息队列equeue
- 通过消息队列实现两进程间通信
- 两个队列实现一个栈