您的位置:首页 > Web前端

缓冲技术之二:缓冲池BufferPool的简单实现

2017-11-26 18:17 531 查看
在文章缓冲技术中提到无论是单缓冲、双缓冲或循环缓冲,均仅是进程专属缓冲配备,而一旦考虑到操作系统的分时并行特性,任一时刻只有一个进程的缓冲体系在工作之中,而其他进程的缓冲体系并不在工作(要么是迁移到swap cache外设磁盘上要么是占据物理内存,显著减少可用物理内存容量),所以采用公用缓冲池架构为多个并发进程提供缓冲服务是当前主流的手段(操作系统、数据库)。

由于完整地提供操作系统级别的缓冲池涉及到较多的内核知识,复杂度较高,出于演示说明的目的,所以本文通过一个为单个进程中的多线程提供缓冲服务的程序demo进行分析。



0. 可扩展数组实现的队列模板

对于缓冲池而言,其主要由3种不同类型的缓冲区构成:空闲缓冲区、装满输入数据的缓冲区和装满输出数据的缓冲区,分别通过3个队列进行管理:空缓冲区队列emq, 装满输入数据的缓冲区队列inq, 装满输出数据的缓冲区队列outq。故而合理的队列实现是构成缓冲池管理各buffer块的基础,这里通过可扩展容量的数组实现队列模板。

queue.h

#pragma once
#include <cassert>
#include <cstring>

namespace _tool
{
template <typename T>
class queue
{
protected:
int mFront,mEnd,mCapacity; //分别定义该队列的头(第一个有效数据Buffer的位置),尾(下一个待插入的空闲位置),mCapacity(当前队列的最大容量)
T *mpData; //指向Buffer块连续数组的数组指针

/*扩展当前buffer块数组的容量2倍*/
virtual void double_resize(void)
{
T *tmp = new T[this->mCapacity*2];
int size = this->size();
for (int i=0; i<size; i++)
{
tmp[i] = this->mpData[(this->mFront+i)%this->mCapacity];
}
delete[] mpData;
this->mpData = tmp;
this->mCapacity*=2;
this->mEnd = size;
this->mFront = 0;
}

public:
/*默认初始化函数*/
queue(void)
{
const int INIT_SIZE = 10;
this->mCapacity = INIT_SIZE;
this->mFront = this->mEnd = 0;
this->mpData = new T[this->mCapacity];
}
/*“引用传递”初始化函数*/
queue(const queue &from)
{
this->mCapacity = from.mCapacity;
this->mFront = from.mFront;
this->mEnd = from.mEnd;
this->mpData = new T[this->mCapacity];
for (int i=0; i<this->mCapacity; i++)
{
this->mpData[i] = from.mpData[i];//是否是深拷贝依旧取决于T数据类型的具体实现
}
}

virtual ~queue(void)
{
if (this->mpData)
{
delete[] this->mpData;
}
this->mpData = 0;
}

/*往队列中压入新数据*/
virtual void push_back(T value)
{
if (this->size() >= this->mCapacity-1)
{
this->double_resize();
}
this->mpData[this->mEnd++] = value;
this->mEnd %= this->mCapacity;
}

/*取出队列的首部元素*/
virtual const T pop_front(void)
{
assert(!this->empty()); //输出断言,如果队列是空,则弹出断言,如果非空,则弹出队列首部
T tmp = this->mpData[this->mFront++];
this->mFront %= this->mCapacity;
return tmp;
}

/*获取队列的首部元素,但不弹出*/
virtual const T &get_front(void) const
{
assert(!this->empty());
return this->mpData[this->mFront];
}

/*获取队列中当有效数据规模size*/
virtual int size(void) const
{
return (this->mEnd+this->mCapacity-this->mFront) % this->mCapacity;
}

virtual bool empty(void) const
{
return this->size()==0;
}

const queue &operator =(const queue &from) //引用传递
{
if (this->mpData)
{
delete[] this->mpData;
}
this->mpData = 0;
this->mCapacity = from.mCapacity;
this->mFront = from.mFront;
this->mEnd = from.mEnd;
this->mpData = new T[this->mCapacity];
for (int i=0; i<this->mCapacity; i++)
{
this->mpData[i] = from.mpData[i]; //此处可以看出队列的深拷贝还浅拷贝是由子元素自己实现的操作符函数的实现决定的
}
return *this;
}
};

/*fixed_queue是在queue基础上继承而来的子类,fixed_queue子类并不支持queue的动态可扩展*/
template <typename T>
class fixed_queue : public queue<T>
{
public:
fixed_queue(int size = 10)
{
this->mCapacity = size+1;
this->mFront = this->mEnd = 0;
this->mpData = new T[this->mCapacity];
}

/*引用传递初始化函数*/
fixed_queue(const fixed_queue &from)
{
this->mCapacity = from.mCapacity;
this->mFront = from.mFront;
this->mEnd = from.mEnd;
this->mpData = new T[this->mCapacity];
for (int i=0; i<this->mCapacity; i++)
{
this->mpData[i] = from.mpData[i];
}
}

virtual ~fixed_queue(void)
{
if (this->mpData)
{
delete[] this->mpData;
}
this->mpData = 0;
}

virtual void push_back(T value)
{
assert(!this->full());
this->mpData[this->mEnd++] = value;
this->mEnd %= this->mCapacity;
}
virtual bool full(void)
{
return (this->size() >= this->mCapacity-1);
}

const fixed_queue &operator =(const fixed_queue &right)
{
if (this->mpData)
{
delete[] this->mpData;
}
this->mpData = 0;
this->mCapacity = right.mCapacity;
this->mFront = right.mFront;
this->mEnd = right.mEnd;
this->mpData = new T[this->mCapacity];
for (int i=0; i<this->mCapacity; i++)
{
this->mpData[i] = right.mpData[i];
}
return *this;
}

/*如果fixed_queue实在容量不够,那么必须显式指明新的size参数*/
void resize(int size)
{
if (this->mpData)
{
delete[] this->mpData;
}
this->mpData = 0;
this->mCapacity = size+1;
this->mFront = this->mEnd = 0;
this->mpData = new T[this->mCapacity];

}
/*修改从父类queue中继承的double_resize()动态扩展函数*/
protected:
void double_resize(void)
{
assert(!"Don't call me!!");
}
};
}




1. BufferPool对外提供的API和具体实现

缓冲池的主要构成是三种队列:空闲缓冲块队列EM、装满输入数据的缓冲块队列IN、装满输出数据待处理的缓冲块队列OUT

缓冲池的工作方式主要是4种:收容输入、提取输入、收容输出、提取输出;

BufferPool.h

#pragma once
#include "queue.h"
#include <windows.h>

class IOBuffer
{
public:
/*通过枚举数据结构事先声明三种不同的队列,分别是空闲缓冲块队列、输入数据缓冲块队列、输出数据缓冲块队列*/
enum eBuffType
{
BT_EM = 0,
BT_IN = 1,
BT_OUT = 2,
};

IOBuffer(int buffer_pool_capacity, int each_buffer_size);
~IOBuffer(void);

//4个对外的函数
int accept_in(void);//为了方便实现,只返回一个缓冲区,需要fill_in装入输入缓冲队列,和fill_in一起构成收容输入
int accept_out(void);//为了方便实现,只返回一个缓冲区,需要fill_out装入输出缓冲队列,和fill_out一起构成收容输出
void fill_in(int number);//将缓冲块buffer装载输入数据后放入输入缓冲区队列
void fill_out(int number);//将缓冲区buffer装载要输出的数据然后放入输出缓冲区队列

//2个提取装满数据的缓冲块的函数
void distill_in(void);//对应提取输入
void distill_out(void);//对应提取输出

_tool::fixed_queue<char> &thebuffer(int number); //返回指定index的buffer的引用

private:
IOBuffer(const IOBuffer &from);

int get_buff(int type);//已处理互斥问题
void put_buf(int type, int work_buf);//已处理互斥问题

/*为了简化操作,直接在当前进程空间的堆中申明一块buffer_pool_capacity * each_buffer_size大小的空间
作为缓冲池的物理空间使用,并用mpBuffers指针指向该空间的首地址*/
_tool::fixed_queue<char> *mpBuffers;

//3个队列,之所以模板类采用int具象化,是因为和后续的设计配合在一起,我们是通过数组批量划分缓冲空间的,
//故而每个缓冲块都对应缓冲空间数组中的具体索引Index,出于简化管理,之所以管理各缓冲块的Index即可
_tool::queue<int> mqEM;
_tool::queue<int> mqIN;
_tool::queue<int> mqOUT;

HANDLE mhEM;//队列容量的信号量
HANDLE mhIN;//队列容量的信号量
HANDLE mhOUT;//队列容量的信号量
HANDLE mhoEM;//队列访问权的信号量
HANDLE mhoIN;//队列访问权的信号量
HANDLE mhoOUT;//队列访问权的信号量
int mBufferID;
static int mstaTotalBuffers;
};


BufferPool.cpp

#include "BufferPool.h"
#include <sstream>
#include <iostream>

using namespace _tool;

extern HANDLE OUTPUT_MUTEX;
int IOBuffer::mstaTotalBuffers = 0;

IOBuffer::IOBuffer(int buffer_pool_capacity, int each_buffer_size)
{
/*初始化缓冲池,缓冲池中buffer缓冲块数目总共有buffer_pool_capacity参数指定,
每个缓冲块buffer含有each_buffer_size个字节*/
this->mpBuffers = new fixed_queue<char>[buffer_pool_capacity];
for (int i=0; i<buffer_pool_capacity; i++)
{
this->mpBuffers[i].resize(each_buffer_size);
}

//记录下该缓冲池的编号
this->mBufferID = ++IOBuffer::mstaTotalBuffers;
std::stringstream ss;//设置string对象,用于后续提供内核的唯一性命名
ss << "BufferPool NO." << IOBuffer::mstaTotalBuffers;

/*设置以下3个互斥量,3各信号量,每个队列对应一个Mutex和semaphore,其中Mutex对应队列的具体操作权限,比如弹出队列首,或压入新元素,显然每个时刻只能有一个线程获取具体的操作权限。
而semaphore对应具体队列中的当前可用缓冲块数目,故而应该先获取semaphore,然后再或获取操作权限,双层内核加锁保证最后的线程互斥*/
this->mhEM = CreateSemaphore(NULL, buffer_capacity, buffer_capacity, (ss.str() + "EM").c_str());
this->mhIN = CreateSemaphore(NULL, 0, buffer_capacity, (ss.str() + "IN").c_str());
this->mhOUT = CreateSemaphore(NULL, 0, buffer_capacity, (ss.str() + "OUT").c_str());
this->mhoEM = CreateMutex(NULL, false, (ss.str() + "oEM").c_str());
this->mhoIN = CreateMutex(NULL, false, (ss.str() + "oIN").c_str());
this->mhoOUT = CreateMutex(NULL, false, (ss.str() + "oOUT").c_str());

//初始化空闲缓冲区队列,将当前所有的空闲缓冲块的索引按照顺序依次加入到空闲队列中,这也是
for (int i=0; i<buffer_capacity; i++)
{
this->mqEM.push_back(i);
}
}

/*引用传递的初始化函数设计为无操作,你也可以自己实现,不过在这里不是很有意义*/
IOBuffer::IOBuffer(const IOBuffer &from)
{
}

IOBuffer::~IOBuffer(void)
{
//删除缓冲区
delete[] this->mpBuffers;
//清除信号量
CloseHandle(this->mhEM);
CloseHandle(this->mhIN);
CloseHandle(this->mhOUT);
CloseHandle(this->mhoEM);
CloseHandle(this->mhoIN);
CloseHandle(this->mhoOUT);
}

/*返回指定index的缓冲块buffer的引用*/
fixed_queue<char> &IOBuffer::thebuffer(int number)
{
return this->mpBuffers[number];
}

int IOBuffer::get_buff(int type)
{
queue<int> *pq; //临时队列指针,指向将要操作的缓冲块的首地址
HANDLE handle=NULL,ohandle=NULL;//临时内核对象,用来获取对应队列的mutex和semaphore
//根据类型选择不同的队列和控制句柄
switch (type)
{
case BT_EM:
pq = &this->mqEM;
handle = this->mhEM; //获取空闲队列的准入权限
ohandle = this->mhoEM;//获取空闲队列的操作权限,
break;

case BT_IN:
pq = &this->mqIN;
handle = this->mhIN;
ohandle = this->mhoIN;
break;

case BT_OUT:
pq = &this->mqOUT;
handle = this->mhOUT;
ohandle = this->mhoOUT;
break;

default:
assert(!"unknown type");
break;
}
//申请一个空间,等待信号量
WaitForSingleObject(handle, INFINITE); //先获取具体队列的准入权限,该操作会将该队列的可用资源数减1
int tmp;
//队列操作要互斥
WaitForSingleObject(ohandle, INFINITE); //获取具体队列的操作权限
tmp = pq->pop_front();//将该队列的可用缓冲块的Index弹出
ReleaseMutex(ohandle);//释放具体队列的操作权限
return tmp;
}

void IOBuffer::put_buf(int type, int work_buf)
{
queue<int> *pq;
HANDLE handle=NULL,ohandle=NULL;
//根据类型选择不同的队列和控制句柄
switch (type)
{
case BT_EM:
pq = &this->mqEM;
handle = this->mhEM;
ohandle = this->mhoEM;
break;

case BT_IN:
pq = &this->mqIN;
handle = this->mhIN;
ohandle = this->mhoIN;
break;

case BT_OUT:
pq = &this->mqOUT;
handle = this->mhOUT;
ohandle = this->mhoOUT;
break;

default:
assert(!"unknown type");
break;
}
//释放一个空间,释放信号量
WaitForSingleObject(ohandle, INFINITE);//获取该队列的操作权限
pq->push_back(number);//将number指代的缓冲块加入到该队列的尾部
ReleaseMutex(ohandle);
ReleaseSemaphore(handle, 1, NULL);//将该队列的可用资源计数加1
}

/*为inq队列装入新的数据,先从空闲缓冲块队列中提取一个可用的空闲缓冲块*/
int IOBuffer::accept_in(void)
{
return this->get_buff(BT_EM);
}

int IOBuffer::accept_out(void)
{
return this->get_buff(BT_EM);
}
/*将number号装载号了有效数据的输入缓冲块加入到inq输入缓冲块等待队列中,等待distill_in()函数提取处理*/
void IOBuffer::fill_in(int number)
{
this->put_buf(BT_IN, number);
}

void IOBuffer::fill_out(int number)
{
this->put_buf(BT_OUT, number);
}

/*提取输入数据缓冲队列的函数*/
void IOBuffer::distill_in(void)
{
//拿到一个已满的输入buffer
int number = this->get_buff(BT_IN); //从输入缓冲块队列中提取首部缓冲块,get_buff()函数已经设计好了互斥机制
WaitForSingleObject(OUTPUT_MUTEX, INFINITE); //获取stdout标准输出的输出权限
std::cout << "*****flushing buffer " << number << " to input*****" << std::endl;
std::cout << "##INPUT> ";
//依次将number号缓冲块中存储的字节序列数据依次输出到stdout
while (!this->mpBuffers[number].empty())
{
std::cout << this->mpBuffers[number].pop_front();
}
std::cout << std::endl;
ReleaseMutex(OUTPUT_MUTEX);
//释放掉buffer
this->put_buf(BT_EM, number); //将number号缓冲块加入到空闲队列中,put_buff()函数已经设计号了互斥机制
}

/*提取输出数据缓冲队列的函数*/
void IOBuffer::distill_out(void)
{
//拿到一个已满的输出buffer
int number = this->get_buff(BT_OUT);
WaitForSingleObject(OUTPUT_MUTEX, INFINITE);
std::cout << "flushing buffer " << number << " to output" << std::endl;
std::cout << "##OUTPUT";
while (!this->mpBuffers[number].empty())
{
std::cout << this->mpBuffers[number].pop_front();
}
std::cout << std::endl;
ReleaseMutex(OUTPUT_MUTEX);
//释放掉buffer
this->put_buf(BT_EM, number);
}




2. BufferPool测试程序

test_main.cpp

#include "queue.h"
#include <iostream>
#include "IOBuffer.h"
#include <windows.h>

HANDLE OUTPUT_MUTEX;//使用标准输出设备stdout的互斥锁,任一时刻只有一个线程可以输出,防止把输出搞乱
IOBuffer afxBuff(10, 10);//全局的一个缓冲池,一共有10个buffer,每个buffer空间为10
using namespace _tool;//使用定义在_tool命名空间中的queue,不然回合std中的queue冲突

DWORD WINAPI distill_in(void*);//提取输入,出于显示目的,只设计一个提取输入线程
DWORD WINAPI distill_out(void*);//提取输出,出于显示目的,只设计一个提取输入线程
DWORD WINAPI accept_in(void *para);//收容输入
DWORD WINAPI accept_out(void *para);//收容输出

int main(void)
{
OUTPUT_MUTEX = CreateMutex(NULL, false, "OUTPUT");//先创建一个MUTEX控制输出,FALSE意味着该mutex并不为创建的宿主线程占用,一开始便处于激活状态,可介绍任何线程的声明占用

HANDLE hfuncs[22];//创建22个线程,1各提取输入线程,1个提取输出线程,收容输入和收容输出线程各10个
hfuncs[0] = CreateThread(NULL, 0, distill_in, NULL, 0, NULL);//创建一个提取输入进程
hfuncs[1] = CreateThread(NULL, 0, distill_out, NULL, 0, NULL);//创建一个提取输出进程
//创建10个收容输入进程
for (int i=0; i<10; i++)
{
char c[11]="abcdefghij";
hfuncs[2+i] = CreateThread(NULL, 0, accept_in, c+i, 0, NULL);
}
//创建10个收容输出进程,为了区别收容输入,输出数据全部是用大写
for (int i=0; i<10; i++)
{
char c[11]="ABCDEFGHIJ";
hfuncs[12+i] = CreateThread(NULL, 0, accept_out, c+i, 0, NULL);
}
WaitForMultipleObjects(22, hfuncs, true, 10000);//10s自动结束阻塞等待,否则该程序将一直输出
return 0;
}

DWORD WINAPI distill_in(void*)//不断地检测输入缓冲区队列是否非空,有则输入
{
while (1)
{
Sleep(700);
afxBuff.distill_in();
}
}

DWORD WINAPI distill_out(void*)//不断地检测输出缓冲区队列是否非空,有则输出
{
while (1)
{
Sleep(800);
afxBuff.distill_out();
}
}

DWORD WINAPI accept_in(void *para)//不断检测是否有缓冲区可用,有则输入
{
char c = *((char*)para); //提取输入的单个字符
while (1)
{
Sleep(900);
int number = afxBuff.accept_in();
WaitForSingleObject(OUTPUT_MUTEX,INFINITE);
std::cout << "filling input buffer " << number << " with " << c << std::endl;
ReleaseMutex(OUTPUT_MUTEX);
//将该缓冲块用该字符填满
while (!afxBuff.thebuffer(number).full())
{
afxBuff.thebuffer(number).push_back(c);
}
WaitForSingleObject(OUTPUT_MUTEX,INFINITE);
std::cout << "put buffer " << number << " into input buffer queue" << std::endl;
ReleaseMutex(OUTPUT_MUTEX);
afxBuff.fill_in(number); //将装满的输入缓冲块装入in输入队列中
}
}

DWORD WINAPI accept_out(void *para)//不断检测是否有缓冲区可用,有则输出
{
char c = *((char*)para);
while (1)
{
Sleep(600);
int number = afxBuff.accept_out();
WaitForSingleObject(OUTPUT_MUTEX,INFINITE);
std::cout << "filling output buffer " << number << " with " << c << std::endl;
ReleaseMutex(OUTPUT_MUTEX);
while (!afxBuff.thebuffer(number).full())
{
afxBuff.thebuffer(number).push_back(c);
}
WaitForSingleObject(OUTPUT_MUTEX,INFINITE);
std::cout << "put buffer " << number << " into output buffer queue" << std::endl;
ReleaseMutex(OUTPUT_MUTEX);
afxBuff.fill_out(number);
}
}


在Windows下VC编译器运行该程序,得到结果如下



这里的缓冲池技术实现的较为简单,实际的缓冲池实现涉及到诸多的内核知识,诸如系统中断、内存管理等,本文只是出于演示的目的,力求通过分析该简单的程序讲解缓冲池技术实现的重点。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  操作系统 缓冲池