您的位置:首页 > 数据库 > Redis

redis事务、管道及消息通知探究

2016-03-25 02:36 567 查看
一.事务

redis中使用事务,multi表示事务开始,对redis进行一些列操作之后再用exec提交事务,对应的方法分别是Transaction jedis.multi(),List<Object> tx.exec()。需要注意的是,开启了事务,在提交事务前,操作redis不是用Jedis API,而是用Transaction API(Jedis API的绝大部分方法在Transaction API都有对应),否则会报JedisDataException异常:Cannot use Jedis when in Multi。还有一点,同关系型数据库事务不同,如果redsi事务中有命令执行错误,不会回滚,其他命令还是会执行的。redis的事务是原子性的,在命令执行的过程中不会被事务外的其他命令插入。

还可以在multi之前用watch来监视某一个键,若在事务执行前此键的值发生变化,则事务就不会执行。

示例:

public static void main(String[] args) {
Jedis jedis = new Jedis("192.168.2.128", 6379);
System.out.println(jedis.keys("*"));
jedis.flushDB();
jedis.set("name", "zhangsan");
jedis.watch("name");
Transaction tx = jedis.multi();
for (int i = 0; i < 10; i++) {
tx.lpush("studentName", "lisi" + i);
}
tx.exec();
System.out.println(jedis.keys("*"));
jedis.close();
}


二.管道

通过管道可以一次性发送多条命令并在执行完之后一次性将结果返回,非常节省时间。

示例:

public static void main(String[] args) {
Jedis jedis = new Jedis("192.168.2.128", 6379);
System.out.println(jedis.keys("*"));
jedis.flushDB();
jedis.set("name", "zhangsan");
Pipeline pline = jedis.pipelined();
for (int i = 0; i < 100; i++) {
pline.lpush("studentName", "lisi" + i);
}
pline.sync();
System.out.println(jedis.keys("*"));
jedis.close();
}


值得注意的是,不能事务提交前或者管道关闭前调用redis.clients.jedis.Response的get()方法查询执行结果,否则会报JedisDataException: Please close pipeline or multi block before calling this method.

例如,如果在第一个示例代码中的tx.exec();前加上以下代码查看事务执行结果时就会报异常:

Response<List<String>> response = tx.lrange("studentName", 0, -1);
System.out.println(response.get());

同样,如果在第二个示例代码中的pline.sync();前加上以下代码查看管道执行结果时也会报异常:

Response<List<String>> response = pline.lrange("studentName", 0, -1);
System.out.println(response.get());

三.消息通知

消息通知有两种模式:点对点模式,即生产者发出消息,有且只有一个消费者可以接收并处理消息;发布/订阅模式,即生产者发出消息,可以有多个消费者同时接收并处理消息。

1.点对点模式

点对点模式又叫任务队列,操作的是列表类型的键,生产者线程不停的往队列里丢任务(lpush),消费者线程不停的从队列中取任务(rpop或者brpop)。rpop与brpop的区别在于前者在指定的时间内取不到值会返回nil(jedis返回空字符串),而后者在指定的时间内取不到值的话不会返回,会一直在那里等,直到取到值然后返回。

对应的jedis方法是List<String> brpop(int timeout, String... keys) ,返回的是一个list,这个list的长度为2,第一个元素是弹出值的键的名字,第二个元素是弹出的元素。如果方法中传入不止一个列表类型键,则会优先从第一个键弹值,如果第一个列表类型键没有元素,则会从第二个键弹值,以此类推,这也被称为优先级队列。如果都没有元素的话,就会一直等,阻塞线程。

示例:

public static void main(String[] args) {

new Thread(new Runnable() {
Jedis jedis = new Jedis("192.168.83.128", 6379);

public void run() {
while (true) {
String str = UUID.randomUUID().toString();
jedis.lpush("hobbies", str);
System.out.println(Thread.currentThread().getName() + "向" + "hobbies插入" + str);
}
}
}, "生产者线程").start();

new Thread(new Runnable() {
Jedis jedis = new Jedis("192.168.83.128", 3679);

public void run() {
while (true) {
List<String> list = jedis.brpop(0, "hobbies");
System.out.println(Thread.currentThread().getName() + "从" + list.get(0) + "取出" + list.get(1));
}
}
}, "消费者线程").start();
}


2.发布/订阅模式

发布订阅模式包含两种角色,分别是发布者和订阅者。发布者可以依次向多个频道发布消息,也可以有多个订阅者,各自订阅一个或者多个频道。值得注意的是,订阅者只能接收到订阅之后发布者发布的消息,所以订阅者若想接收发布者的消息,必须在发布者发布消息之前完成订阅。

在redis-cli客户端,发布消息的命令是publish,用法是publish channel message,如向channel.1发布"hi":publish channel.1 hi,对应的jedis方法是Long jedis.publish(String channel, String message);

订阅频道的命令是subscribe,用法是subscribe channel [channel...],如订阅channel.1频道:subscribe channel.1,对应的jedis方法是jedis.subscribe(JedisPubSub jedisPubSub, String... channels),其中JedisPubSub是个抽象类,获得其实例,我们得新建一个它的子类,并重写里面的方法。

除了一般订阅之外还有一种是按照规则订阅,命令是psubscribe,用法同上,后面跟一个或多个频道名,频道名支持glob风格的通配符,常用的有?匹配一个字符,*匹配任意个字符,对应的jedis方法是jedis.psubscribe(JedisPubSub jedisPubSub, String... patterns),同样支持上面的通配符。

subscribe、psubscribe同样会阻塞线程,接收到发布者的消息后还会一直等待发布者发布新的消息。

示例:

class MyJedisPubSub extends JedisPubSub {

public void onMessage(String channel, String message) {
System.out.println("从" + channel + "接收:" + message);
}

public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(channel + "," + subscribedChannels);
}
}

public class TestJedisPubSub {
public static void main(String[] args) {
JedisPoolConfig poolConfig = new JedisPoolConfig();
// 最大连接数
poolConfig.setMaxTotal(2);
// 最大空闲数
poolConfig.setMaxIdle(2);
// 最大允许等待时间,如果超过这个时间还未获取到连接,则会报JedisException异常:
// Could not get a resource from the pool
poolConfig.setMaxWaitMillis(1000);
final JedisPool pool = new JedisPool(poolConfig, "192.168.83.128", 6379);

new Thread(new Runnable() {
Jedis jedis = pool.getResource();

public void run() {
jedis.subscribe(new MyJedisPubSub(), "channel.1");
}
}, "订阅者").start();

new Thread(new Runnable() {
Jedis jedis = pool.getResource();

public void run() {
while (true) {
jedis.publish("channel.1", UUID.randomUUID().toString());
}
}
}, "发布者").start();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: