您的位置:首页 > 数据库 > Redis

基于redis+mysql+php的简单队列实现

2017-07-25 13:23 906 查看
消息队列在是分布式系统中必不可少的中间件,目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ等。然而对于一个要求不高的小型系统来说,单独使用维护这些队列系统代价有点大。而redis可以在做缓存的同时也满足队列的需求。redis的list是有序的列表,加上其出队入队函数,利用其特性很简单的就能实现一个消息队列。

 

一、业务层邮件入队列

入队列使用redis的lpush指令

 

LPUSH key value1 [value2]

将一个或多个值插入到列表头部

 

先将邮件详情写入mysql存储,再将该邮件id push到redis队列头部

function sendSysMail($userId, $type, $content)

{

        $sysSmsLogModel = M('SysEmailLog');

        $data = array(

                'user_id' => $userId,

                'content' => $content,

                'add_time' => time(),

                'date_time' => date('Y-m-d H:i:s', time()),

                'add_ip' => get_client_ip(),

        );

        if($sysSmsLogModel->create($data))

        {

                $emailId = $sysSmsLogModel->add();    //邮件ID

                //添加邮件队列

                $redis = new Redis();

                $redis->connect(C('REDIS_HOST'), C('REDIS_PORT'));

                $redis->lPush('mailQueue', $emailId);  //lPush将邮件ID入到队列头部

                return TRUE;

        }

        else

        {

                return FALSE;

        }

}

二、后台队列监听
后台执行一个PHP进程,监听队列,对队列循环进行出队列操作。一旦队列有数据,就从mysql中取对应的邮件详细情况进行发送。

出队操作使用BRPOP命令

 

BRPOP key1 [key2 ] timeout

移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

 

使用过程中发生了件奇怪的事情,后台进程开启后开始执行的好好的,并没有什么异常,但跑了一晚上,第二天再来看时便不正常了。

看了下redis的数据,都正常的出入队列了,但是邮件并未发送,日志中也没有发送失败的记录。

一番折腾,发现是mysql连接这块存在问题,mysql默认的连接超时时间是28800秒,即8小时,超过8小时,mysql连接就断开了,过了一晚上,mysql自然就go away了。

于是在没有队列没有数据的时候执行select 1来延长mysql连接的超时时间点,select 1执行异常时重新建立一个mysql的连接替换之前的连接,以此来保持mysql的连接始终可用。

 

<?php

//mailQueue.php

$config = include('config.php');

$redis = new Redis();

$redis->connect($config['REDIS_HOST'], $config['REDIS_PORT']);

$pdo= new PDO($config['DSN'], $config['DB_USER'], $config['DB_PWD'], array(PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION));

 

while (true)

{

    $task = $redis->brPop('mailQueue', 10);    //brPop从队列尾部出队

    if ($task)

    {

        $emailId = $task[1];

        $sql = "SELECT a.*, b.email FROM sys_email_log AS a, user_main AS b WHERE a.user_id = b.user_id AND a.email_id = {$emailId}";

        $res = $pdo->query($sql);

        foreach ($res as $row)

        {

            $to = $row['email'];

            $title = '邮件提醒';

            $content = $row['content'];

            $result = send($to, $title, $content);    //send邮件发送函数,调用外部系统邮件接口

            if ($result)

            {

                echo date("Y-m-d H:i:s", time()) . ": " . $emailId . "发送成功\n";

            }

            else

            {

                echo date("Y-m-d H:i:s", time()) . ": " . $emailId . "发送失败\n";

            }       

        }

    }

    else

    {

        try

        {

            $pdo->query("SELECT 1");

            echo date("Y-m-d H:i:s", time()) . ": 保持连接\n";

        }

        catch (PDOException $e)

        {

            $pdo = new PDO($config['DSN'], $config['DB_USER'], $config['DB_PWD'], array(

                PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION));

            echo date("Y-m-d H:i:s", time()) . ": mysql重连\n";

        }

    }

}

 

?>

 

 
三、队列监听程序的守护进程

用shell写的守护进程,方便队列监听程序的开启关闭以及状态查看

#!/bin/bash

#开启

function start()

{

    #先检测程序是否已经开启

    pid=`ps -ef | grep "php -f mailQueue.php" | grep -v grep | awk '{print $2}'`

    if [ "$pid" == "" ]

    then

        php -f mailQueue.php >> mailLog &

        echo "程序启动成功"

    else

        echo "程序已经开启过"

    fi

}

#关闭

function stop()

{

    pid=`ps -ef | grep "php -f mailQueue.php" | grep -v grep | awk '{print $2}'`

    if [ "$pid" == "" ]

    then

        echo "程序未开启"

    else

        kill -9 $pid

        echo "程序关闭成功"

    fi

}

#查看开启状态

function status()

{

    pid=`ps -ef | grep "php -f mailQueue.php" | grep -v grep | awk '{print $2}'`

    if [ "$pid" == "" ]

    then

        echo "程序未开启"

    else

        echo "程序运行中,pid: $pid"

    fi

}

#主程序

case "$1" in

"start")

    start

    ;;

"stop" )

    stop

    ;;

* )

    echo "参数错误! Usage: mailQueue [start|stop|status]"

    ;;

esac

四、运行演示

1.使用守护进程脚本启动队列后台监听进程



 

2.数据库中添加一条测试数据

 


3.redis手动入队数据,模拟业务入队

 


4.查看邮件发送日志



 

五、总结

这是一个很简单的消息队列程序,实现了消息队列最基本的功能。但要想稳定运行还有很多需要优化处理的地方。

1.程序只考虑了最简单的状况,对于邮件发送失败的情况只是打了日志,并未做任何处理。可以在数据库邮件详情表增加记录邮件发送失败次数的字段。邮件发送失败后,累加记录失败次数,设置一个最大失败次数,小于最大失败次数则重新入队列,再次发送。直到达到最大失败次数之后不再发送,以此来提高邮件的发送成功率。

2.重新启动后数据的恢复,redis本身有完善数据备份机制,这点可以通过redis本身机制来实现。



 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  mysql php redis