您的位置:首页 > 数据库 > Redis

EasyCMS在幼儿园视频直播项目实战中以redis操作池的方式应对高并发的redis操作问题

2017-05-29 18:28 477 查看
在之前的博客《 EasyDarwin幼教云视频平台在幼教平台领域大放异彩!》中我们也介绍到,EasyCMS+EasyDarwin+redis形成的EasyDarwin云平台方案,在幼教平台领域中稳定运营,收到了用户的良好口碑;

随着幼儿园平台用户和接入幼儿园的数量不断增加,EasyCMS的redis操作也越来越频繁,我们在运维中发现EasyCMS的cpu占用非常高,通过线程分析,发现大家都在等待一个redis操作对象,redis操作对象锁造成了资源竞争,于是,我们决定采用redis操作池,我们的基本思路是:

初始化建立16个redis操作对象RedisHandler,并保持连接redis server;

采用链表的方式管理RedisHandler,先进先出的方式从链表获取RedisHandler;

当从链表获取不到RedisHandler,表示资源不足,动态new一个RedisHandler;

每一次操作Redis完成后,将RedisHandler插入回链表;

限定一个RedisHandler链表的最大长度,当超过最大长度时,直接对RedisHandler进行销毁,不插回链表;

基于这个思路,我们实现了一个RedisHandler类:

#ifndef __EASY_REDIS_HANDLER_H__
#define __EASY_REDIS_HANDLER_H__

#include "OSHeaders.h"
#include "QTSServerInterface.h"
#ifdef WIN32
#include "Windows/hiredis.h"
#else
#include "hiredis.h"
#endif //WIN32

#include "Task.h"

class RedisReplyObjectDeleter
{
public:
RedisReplyObjectDeleter() : fReply(NULL) {}
explicit RedisReplyObjectDeleter(redisReply* reply) : fReply(reply)  {}
~RedisReplyObjectDeleter()
{
if (fReply)
{
freeReplyObject(fReply);
}
}

void ClearObject() { fReply = NULL; }

void SetObject(redisReply* reply)
{
fReply = reply;
}
redisReply* GetObject() const { return fReply; }

private:

redisReply* fReply;
};

class EasyRedisHandler : public Task
{
public:

EasyRedisHandler(const char* ip, UInt16 port, const char* passwd);
virtual ~EasyRedisHandler();

QTSS_Error RedisTTL();

QTSS_Error RedisSetDevice(Easy_DeviceInfo_Params* inParams);
QTSS_Error RedisDelDevice(Easy_DeviceInfo_Params* inParams);
QTSS_Error RedisGetAssociatedDarwin(QTSS_GetAssociatedDarwin_Params* inParams);

OSQueueElem     fQueueElem;
UInt32          fID;
private:

virtual SInt64  Run();

bool            sIfConSucess;
OSMutex         sMutex;
redisContext*   redisContext_;

char            fRedisIP[128];
UInt16          fRedisPort;
char            fRedisPasswd[256];

bool RedisConnect();
void RedisErrorHandler();
};

#endif //__EASY_REDIS_HANDLER_H__


#include "EasyRedisHandler.h"

#include "QTSSMemoryDeleter.h"
#include "Format.h"
#include "Resources.h"

static UInt32 sRedisHandlerID = 0;

EasyRedisHandler::EasyRedisHandler(const char* ip, UInt16 port, const char* passwd)
: fQueueElem(),
fID(sRedisHandlerID++),
sIfConSucess(false),
sMutex(),
redisContext_(NULL)
{
this->SetTaskName("EasyRedisHandler");

fQueueElem.SetEnclosingObject(this);

::strcpy(fRedisIP, ip);
fRedisPort = port;
::strcpy(fRedisPasswd, passwd);

this->Signal(Task::kStartEvent);
}

EasyRedisHandler::~EasyRedisHandler()
{
RedisErrorHandler();
}

SInt64 EasyRedisHandler::Run()
{
OSMutexLocker locker(&sMutex);

EventFlags theEvents = this->GetEvents();

RedisConnect();

return 0;
}

bool EasyRedisHandler::RedisConnect()
{
if (sIfConSucess)
{
return true;
}

bool theRet = false;
do
{
struct timeval timeout = { 2, 0 }; // 2 seconds
redisContext_ = redisConnectWithTimeout(fRedisIP, fRedisPort, timeout);
if (!redisContext_ || redisContext_->err)
{
if (redisContext_)
{
printf("Redis context connect error \n");
}
else
{
printf("Connection error: can't allocate redis context\n");
}

theRet = false;
break;
}

string auth = Format("auth %s", string(fRedisPasswd));
redisReply* reply = static_cast<redisReply*>(redisCommand(redisContext_, auth.c_str()));

RedisReplyObjectDeleter replyDeleter(reply);
if (!reply || string(reply->str) != string("OK"))
{
printf("Redis auth error\n");
theRet = false;
break;
}

theRet = true;
sIfConSucess = true;

printf("Connect Redis success\n");

} while (0);

if (!theRet && redisContext_)
{
RedisErrorHandler();
}

return theRet;
}


在外围链表管理上,一开始我们就初始化16个RedisHandler:

for (UInt32 numPackets = 0; numPackets < 16; numPackets++)
{
EasyRedisHandler* handler = new EasyRedisHandler(sRedis_IP, sRedisPort, sRedisPassword);
sFreeHandlerQueue.EnQueue(&handler->fQueueElem);//put this packet onto the free queue
}


再各定义一个获取RedisHandler和回收RedisHandler的方法:

EasyRedisHandler* GetRedisHandler()
{
OSMutexLocker locker(&sMutex);
if (sFreeHandlerQueue.GetLength() == 0)
//if the port number of this socket is odd, this packet is an RTCP packet.
return new EasyRedisHandler(sRedis_IP, sRedisPort, sRedisPassword);
else
return (EasyRedisHandler*)sFreeHandlerQueue.DeQueue()->GetEnclosingObject();
}

void RedisHandlerReclaim(EasyRedisHandler* handler)
{
if(handler)
{
printf("RedisHandlerReclaim ID:%d \n", handler->fID);
if(sFreeHandlerQueue.GetLength() > sMaxRedisClientPoolSize)
handler->Signal(Task::kKillEvent);
else
sFreeHandlerQueue.EnQueue(&handler->fQueueElem);
}
}


于是,完整的redis操作池方法就实现了,基于这个方法实现的操作池,完全满足了项目上的需求;

请关注我们的EasyDarwin开源项目:https://github.com/EasyDarwin

获取更多信息

邮件:support@easydarwin.org

WEB:www.EasyDarwin.org

Copyright © EasyDarwin.org 2012-2017

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐