您的位置:首页 > 数据库 > Redis

redis 消息队列

2018-03-01 17:19 176 查看
//消息类
package com.daqi.redis.project.messagequeue;
import java.io.Serializable;
public class Message implements Serializable{

/**

*/
private static final long serialVersionUID = -3781098484588713947L;

public Message(int id, String content) {
super();
this.id = id;
this.content = content;
}
private int id;
private String content;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}

//对象转为二进制工具类
package com.daqi.redis.project.messagequeue;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

public class ObjectUtil {
/**对象转byte[]
* @param obj
* @return
* @throws IOException
*/
public static byte[] objectToBytes(Object obj) throws Exception{
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
byte[] bytes = bo.toByteArray();
bo.close();
oo.close();
return bytes;
}
/**byte[]转对象
* @param bytes
* @return
* @throws Exception
*/
public static Object bytesToObject(byte[] bytes) throws Exception{
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream sIn = new ObjectInputStream(in);
return sIn.readObject();
}

}

//redis工具类
package com.daqi.redis.project.messagequeue;

import java.util.List;
import java.util.Map;
import java.util.Set;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
@SuppressWarnings({"deprecation","unused"})
public class JedisUtil {
//可以读取配置文件
private static String JEDIS_IP;
private static int JEDIS_PORT;
private static String JEDIS_PASSWORD;
//private static String JEDIS_SLAVE;

private static JedisPool jedisPool;

static {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(20); // 最大连接数
// config.setMaxActive(5000);
config.setMaxIdle(256);//20
// config.setMaxWait(5000L);
jedisPool = new JedisPool(config, "127.0.0.1", 6379, 60000,"123456");
}

/**
* 获取数据
* @param key
* @return
*/

public static String get(String key) {

String value = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
value = jedis.get(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}

return value;
}

public static void close(Jedis jedis) {
try {
jedisPool.returnResource(jedis);

} catch (Exception e) {
if (jedis.isConnected()) {
jedis.quit();
jedis.disconnect();
}
}
}

/**
* 获取数据

* @param key
* @return
*/
public static byte[] get(byte[] key) {

byte[] value = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
value = jedis.get(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}

return value;
}

public static void set(byte[] key, byte[] value) {

Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.set(key, value);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}

public static void set(byte[] key, byte[] value, int time) {

Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.set(key, value);
jedis.expire(key, time);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}

public static void hset(byte[] key, byte[] field, byte[] value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hset(key, field, value);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}

public static void hset(String key, String field, String value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hset(key, field, value);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}

/**
* 获取数据

* @param key
* @return
*/
public static String hget(String key, String field) {

String value = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
value = jedis.hget(key, field);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}

return value;
}

/**
* 获取数据

* @param key
* @return
*/
public static byte[] hget(byte[] key, byte[] field) {

byte[] value = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
value = jedis.hget(key, field);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}

return value;
}

public static void hdel(byte[] key, byte[] field) {

Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hdel(key, field);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}

/**
* 存储REDIS队列 顺序存储
* @param byte[] key reids键名
* @param byte[] value 键值
*/
public static void lpush(byte[] key, byte[] value) {

Jedis jedis = null;
try {
if (jedisPool!=null) {
jedis = jedisPool.getResource();
jedis.lpush(key, value);
}
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();

} finally {

//返还到连接池
close(jedis);

}
}

/**
* 存储REDIS队列 反向存储
* @param byte[] key reids键名
* @param byte[] value 键值
*/
public static void rpush(byte[] key, byte[] value) {

Jedis jedis = null;
try {

jedis = jedisPool.getResource();
jedis.rpush(key, value);

} catch (Exception e) {

//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();

} finally {

//返还到连接池
close(jedis);

}
}

/**
* 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端
* @param byte[] key reids键名
* @param byte[] value 键值
*/
public static void rpoplpush(byte[] key, byte[] destination) {

Jedis jedis = null;
try {

jedis = jedisPool.getResource();
jedis.rpoplpush(key, destination);

} catch (Exception e) {

//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();

} finally {

//返还到连接池
close(jedis);

}
}

/**
* 获取队列数据
* @param byte[] key 键名
* @return
*/
public static List<byte[]> lpopList(byte[] key) {

List<byte[]> list = null;
Jedis jedis = null;
try {

jedis = jedisPool.getResource();
list = jedis.lrange(key, 0, -1);

} catch (Exception e) {

//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();

} finally {

//返还到连接池
close(jedis);

}
return list;
}

/**
* 获取队列数据
* @param byte[] key 键名
* @return
*/
public static byte[] rpop(byte[] key) {

byte[] bytes = null;
Jedis jedis = null;
try {

jedis = jedisPool.getResource();
bytes = jedis.rpop(key);

} catch (Exception e) {

//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();

} finally {

//返还到连接池
close(jedis);

}
return bytes;
}

public static void hmset(Object key, Map<String, String> hash) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.hmset(key.toString(), hash);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();

} finally {
//返还到连接池
close(jedis);

}
}

public static void hmset(Object key, Map<String, String> hash, int time) {
Jedis jedis = null;
try {

jedis = jedisPool.getResource();
jedis.hmset(key.toString(), hash);
jedis.expire(key.toString(), time);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();

} finally {
//返还到连接池
close(jedis);

}
}

public static List<String> hmget(Object key, String... fields) {
List<String> result = null;
Jedis jedis = null;
try {

jedis = jedisPool.getResource();
result = jedis.hmget(key.toString(), fields);

} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();

} finally {
//返还到连接池
close(jedis);

}
return result;
}

public static Set<String> hkeys(String key) {
Set<String> result = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
result = jedis.hkeys(key);

} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();

} finally {
//返还到连接池
close(jedis);

}
return result;
}

public static List<byte[]> lrange(byte[] key, int from, int to) {
List<byte[]> result = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
result = jedis.lrange(key, from, to);

} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();

} finally {
//返还到连接池
close(jedis);

}
return result;
}

public static Map<byte[], byte[]> hgetAll(byte[] key) {
Map<byte[], byte[]> result = null;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
result = jedis.hgetAll(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();

} finally {
//返还到连接池
close(jedis);
}
return result;
}

public static void del(byte[] key) {

Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.del(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
}

public static long llen(byte[] key) {

long len = 0;
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.llen(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
}
return len;
}

}

//测试类
package com.daqi.redis.project.messagequeue;

/**
 * 2018-3-1
 * @author baby
 *redis 队列实例
 */
@SuppressWarnings("unused")
public class MyTest {
public static byte[] redisKey = "key".getBytes();

public static void main(String[] args) {
// Jedis jedis = RRRRRRRRRR.getJedis();
// String string = jedis.get("11");
// System.out.println(string);
try {
init();
pop();
} catch (Exception e) {
e.printStackTrace();
}
}

private static void pop() throws Exception {
byte[] bytes = JedisUtil.rpop(redisKey);
if (bytes!=null) {
Message msg = (Message) ObjectUtil.bytesToObject(bytes);
if(msg != null){
System.out.println(msg.getId()+"   "+msg.getContent());
}
}else {
System.out.println("消息已经取完了");
}
}

private static void init() throws Exception {
Message msg1 = new Message(1, "内容1");
JedisUtil.lpush(redisKey, ObjectUtil.objectToBytes(msg1));
Message msg2 = new Message(2, "内容2");
JedisUtil.lpush(redisKey, ObjectUtil.objectToBytes(msg2));
Message msg3 = new Message(3, "内容3");
JedisUtil.lpush(redisKey, ObjectUtil.objectToBytes(msg3));
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息