您的位置:首页 > 数据库 > Redis

Redis的消息队列(N年前的项目代码)

2017-06-12 11:20 183 查看
代码如下

工具库 tools.inc.php

<?php
/**
* sys tools
*/

/**
* [get_redis description]
* @return [type] [description]
*/
function get_redis()
{
$host = "127.0.0.1";
$port = 6679;
$pwd  = "password";

try {
$redis = new Redis();
$redis->connect($host, $port);

if ($pwd) {
$result = $redis->auth($pwd);
if (!$result) {
log("redis_access", "redis auth failed");
return false;
}
}

return $redis;
} catch (RedisException $e) {
//redis 异常日志
log("redis_access", $e->getMessage());
return false;
}
}

/**
* 日记
* @param  [type] $name [description]
* @param  [type] $msg  [description]
* @return [type]       [description]
*/
function log($name, $msg)
{
$log_file = $name . date("Y_m_d") . ".log";
$log_msg  = date("Y-m-d H:i:s") . "\t" . $msg . PHP_EOL;
file_put_contents($log_file, $log_msg, FILE_APPEND);
}


队列 producer

<?php
/*
*将收到的消息放入队列
*
*/
require_once('tools.inc.php');

$redis = get_redis();

if (!$redis) {
log("queen.log", "redis connect failed!");
exit();
}

//入队列
$channel = "queen_name";

$redis->rpush($channel, serialize($message));


队列 consumer

#! /usr/bin/env php
<?php
/*
|-------------------------------------------------------------------------------------------------
|$channel 队列名
|$max_handle_size 每次处理的最大量
|crontab task run peer 10 seconds
|-------------------------------------------------------------------------------------------------
|* * * * * sleep  0; /path/to/queen_handler.php >>/dev/null 2>&1
|* * * * * sleep 10; /path/to/queen_handler.php >>/dev/null 2>&1
|* * * * * sleep 20; /path/to/queen_handler.php >>/dev/null 2>&1
|* * * * * sleep 30; /path/to/queen_handler.php >>/dev/null 2>&1
|* * * * * sleep 40; /path/to/queen_handler.php >>/dev/null 2>&1
|* * * * * sleep 50; /path/to/queen_handler.php >>/dev/null 2>&1
|-------------------------------------------------------------------------------------------------
*/

require_once('tools.inc.php');

$channel = "queen_name";

$redis = get_redis();

if (!$redis) {
log("queen.log", "redis connect failed!");
exit();
}

//每次出队的最大值
$max_handle_size = 2000;

$queen_len = $redis->llen($channel);

if ($queen_len == 0) { //队列中没有数据则退出
exit();
}

if (0 < $queen_len && $queen_len <= $max_handle_size) { //队列长度小于最大入库长度则直接全部入库
$cursor_cur  = $queen_len - 1; //截断游标
$cursor_next = $queen_len;
$message_queen = $redis->lrange($channel, 0, $cursor_cur);
$redis->ltrim($channel, $cursor_next, -1);
} elseif ($queen_len > $max_handle_size) { //队列长度小于最大入库长度则分批入库
$cursor_cur  = $max_handle_size - 1;
$cursor_next = $max_handle_size;
$message_queen = $redis->lrange($channel, 0, $cursor_cur);
$redis->ltrim($channel, $cursor_next, -1);
}

// 处理消息队列
$message_arr = array_map(function ($message) {
return unserialize($message);
}, $message_queen);

// 下面就是你的处理数据的逻辑了
code to process message

就是这样啦,很简单
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息