用PHP 写的基于 Memcache 的 Queue
2010-07-29 12:09
309 查看
<?php class MQ{ public static $client; private static $m_real; private static $m_front; private static $m_data = array(); const QUEUE_MAX_NUM = 100000000; const QUEUE_FRONT_KEY = '_queue_item_front'; const QUEUE_REAL_KEY = '_queue_item_real'; public static function setupMq($conf) { self::$client = memcache_pconnect($conf); self::$m_real = memcache_get(self::$client, self::QUEUE_REAL_KEY); self::$m_front = memcache_get(self::$client, self::QUEUE_FRONT_KEY); if (!isset(self::$m_real) || empty(self::$m_real)) { self::$real= 0; } if (!isset(self::$m_front) || empty(self::$m_front)) { self::$m_front = 0; } return self::$client; } public static function add($queue, $data) { $result = false; if (self::$m_real < self::QUEUE_MAX_NUM) { if (memcache_add(self::$client, $queue.self::$m_real, $data)) { self::mqRealChange(); $result = true; } } return $result; } public static function get($key, $count) { $num = 0; for ($i=self::$m_front;$i<self::$m_front + $count;$i++) { if ($dataTmp = memcache_get(self::$client, $key.$i)) { self::$m_data[] = $dataTmp; memcache_delete(self::$client, $key.$i); $num++; } } if ($num>0) { self::mqFrontChange($num); } return self::$m_data; } private static function mqRealChange() { memcache_add(self::$client, self::QUEUE_REAL_KEY, 0); self::$m_real = memcache_increment(self::$client, self::QUEUE_REAL_KEY, 1); } private static function mqFrontChange($num) { memcache_add(self::$client, self::QUEUE_FRONT_KEY, 0); self::$m_front = memcache_increment(self::$client, self::QUEUE_FRONT_KEY, $num); } public static function mflush($memcache_obj) { memcache_flush($memcache_obj); } public static function Debug() { echo 'real:'.self::$m_real."<br>/r/n"; echo 'front:'.self::$m_front."<br>/r/n"; echo 'wait for process data:'.intval(self::$m_real - self::$m_front); echo "<br>/r/n"; echo '<pre>'; print_r(self::$m_data); echo '<pre>'; } } define('FLUSH_MQ',0);//CLEAN ALL DATA define('IS_ADD',0);//SET DATA $mobj = MQ::setupMq('127.0.0.1','11211'); if (FLUSH_MQ) { MQ::mflush($mobj); } else { if (IS_ADD) { MQ::add('user_sync', '1test'); MQ::add('user_sync', '2test'); MQ::add('user_sync', '3test'); MQ::add('user_sync', '4test'); MQ::add('user_sync', '5test'); MQ::add('user_sync', '6test'); } else { MQ::get('user_sync', 10); } } MQ::Debug();
MQ::setupMq('127.0.0.1','11211');//连接
MQ::add($key, $value);//添加数据到队列
MQ::add($key, $value);//添加数据到队列
MQ::add($key, $value);//添加数据到队列
MQ::add($key, $value);//添加数据到队列
MQ::add($key, $value);//添加数据到队列
MQ::add($key, $value);//添加数据到队列
MQ:get($key, 10);//取出一定数量的数据
数据被取出后会从队列中消失。
相关文章推荐
- 用PHP写的基于Memcache的Queue实现代码
- 用PHP写的基于Memcache的Queue实现代码
- 负载均衡基于Cookie OpenRest+tomcat+php+memcache+Jsp
- 基于php使用memcache存储session的详解(转)
- php学习笔记(二十八)session的高级管理(基于数据库和memcache的)
- 基于javascript、ajax、memcache和PHP实现的简易在线聊天室
- Linux 安装基于(PHP5.5)memcache扩展
- 基于php使用memcache存储session的详解
- memcache 和 memcached的php拓展(基于centos 7)
- 基于php使用memcache存储session的详解
- php memcache扩展 出现错误dyld: Symbol not found: _mmc_queue_free
- 基于php的一个最简单的memcache的分布式算法
- 基于php使用memcache存储session的详解
- PHP中封装MEMCACHE常用方法基于YII框架下
- 基于javascript、ajax、memcache和PHP实现的简易在线聊天室
- 为基于 MAC OS X 的 php 开发环境安装 memcache
- 基于php使用memcache存储session的详解
- 基于PHP客户端的TokyoTyrant(TCH, TCB, TCT), Memcache, Mysql性能测试
- 基于php使用memcache存储session的详解
- 基于PHP客户端的TokyoTyrant(TCH, TCB, TCT), Memcache, Mysql性能测试(测试脚本)