Memcache CAS协议介绍及使用
2016-10-25 11:30
337 查看
1.什么是CAS
所谓CAS,check and set,在写操作时,先检查是否被别的线程修改过。
基本原理非常简单,一言以蔽之,就是“版本号”。每个存储的数据对象,多有一个版本号。我们可以从下面的例子来理解:
如果不采用CAS,则有如下的情景:
第一步,A取出数据对象X;
第二步,B取出数据对象X;
第三步,B修改数据对象X,并将其放入缓存;
第四步,A修改数据对象X,并将其放入缓存。
我们可以发现,第四步中会产生数据写入冲突。
如果采用CAS协议,则是如下的情景。
第一步,A取出数据对象X,并获取到CAS-ID1;
第二步,B取出数据对象X,并获取到CAS-ID2;
第三步,B修改数据对象X,在写入缓存前,检查CAS-ID与缓存空间中该数据的CAS-ID是否一致。结果是“一致”,就将修改后的带有CAS-ID2的X写入到缓存。
第四步,A修改数据对象Y,在写入缓存前,检查CAS-ID与缓存空间中该数据的CAS-ID是否一致。结果是“不一致”,则拒绝写入,返回存储失败。
这样CAS协议就用了“版本号”的思想,解决了冲突问题。
参考:http://blog.csdn.net/poechant/article/details/7082189
2.非CAS,模拟多客户端并发场景
3.CAS 多线程使用场景
当写入缓存后,如果立刻就有其他客户端进行读操作,则会读取失败,因为set是异步操作(async),很可能仍还没有写入完。
MemcachedClient cache = new MemcachedClient(cacheServerAddr);
cache.set("key", 3600, bigData);
return cache.get("key");
一种可行的方法,是采用同步写操作。常用的set方法没有这种方式,需要采用遵守Memcached的CAS(Check And Set)协议的写操作。而这种写操作,一般是基于读取后得到CAS ID(类似于SVN中的版本ID),根据这个CAS ID来保证写入时,没有和其他写入操作产生“写重复”冲突。因此,在我们现在所讨论的场景中,可以如下使用CAS协议:
(1)初始写入:写一个简单的初始值(set,异步操作);
(2)获取版本:使用异步方式获取CAS ID;
(3)同步写入:以同步方式写入数据,保证在读取前,已经写入结束。
MemcachedClient cache = new MemcachedClient(cacheServerAddr);
cache.set("key", 3600, "");
long casId = cache.asyncGets("key").get().getCas();
cache.cas("key", casid, bigData);
return cache.get("key");
以这种“Set-Asyncgets-Cas”方式的缓存异步实时读写问题的解决方案,我们称之为"SAC"。
什么是CAS协议
Memcached于1.2.4版本新增CAS(Check and Set)协议类同于Java并发的CAS(Compare and Swap)原子操作,处理同一item被多个线程更改过程的并发问题。
在Memcached中,每个key关联有一个64-bit长度的long型惟一数值,表示该key对应value的版本号。这个数值由Memcached server产生,从1开始,且同一Memcached server不会重复。在两种情况下这个版本数值会加1:1、新增一个key-value对;2、对某已有key对应的value值更新成功。删除item版本值不会减小。
例如
Java代码
MemcachedClient client = new MemcachedClient();
client.set("fKey", "fValue");
//第一次set, 在Memcached server中会维护fKey对应的value的版本号,假设是548;
client.set("fKey", "sValue");
//再次set,则这个fKey对应的value的版本号变为549;
CASValue casValue = client.gets("fKey");
//这样就可以得到对应key的cas版本号和实际value(各个Memcached client都有类似的对象表示,名字可能不一样,但效果类同),如 casValue.getValue = "sValue",casValue.getCas=549;
CAS协议解决的问题
模拟多个Memcached client并发set同一个key的场景。如clientA想把当前key的value set为"x",且操作成功;clientB却把当前key的value值由"x"覆盖set为"y",这时clientA再根据key去取value时得到"y"而不是期望的"x",它使用这个值,但不知道这个值已经被其它线程修改过,就可能会出现问题。
CAS协议解决这种并发修改问题。有线程试图修改当前key-value对的value时,先由gets方法得到item的版本号,操作完成提交数据时,使用cas方法谨慎变更,如果在本地对item操作过程中这个key-value对在Memcached server端被其它线程更改过,就放弃此次修改(乐观锁概念)。
Java代码
CASValue casValue = client.gets(key);
//*****
//本地的各种处理
//*****
CASResponse response = client.cas(key, newValue, casValue);
//在我取数据时item的版本号是casValue.getCas(),所以提交时我期望item的版本号是没有改变过的。如果被修改过,不是我取数据时的版本号,那么Memcached server对这次提交什么也不做,返回true或false由用户自己来提出解决方案(什么也不做或是重新获取版本号,再次重试提交等)
并发环境下的正确性验证
用多个Memcached client并发更改同一个key值,将value递增,如果 操作次数-CAS失败次数 = value增加的值,表示并发环境下CAS处理没有问题。
Java代码
import java.io.IOException;
import java.net.InetSocketAddress;
import net.spy.memcached.CASResponse;
import net.spy.memcached.CASValue;
import net.spy.memcached.MemcachedClient;
public class CASTest {
private static MemcachedClient client = null;
static {
try {
client = new MemcachedClient(
new InetSocketAddress("localhost", 11211));
} catch (IOException o) {
o.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
//Firstly, the key should exist.
//key is "number", value is Integer 1, 7845 is expire time
client.set("number", 7845, 1);
CASTest testObj = new CASTest();
//start the multithread environment
for (int i = 0; i < 10; i++) {
testObj.new ThreadTest("Thread-" + (i + 1)).start();
}
}
/**
* Each thread runs many times
*/
private class ThreadTest extends Thread {
private MemcachedClient client = null;
ThreadTest(String name) throws IOException {
super(name);
client = new MemcachedClient(
new InetSocketAddress("localhost", 11211));
}
public void run() {
int i = 0;
int success = 0;
while (i < 10) {
i++;
CASValue<Object> uniqueValue =client.gets("number");
CASResponse response = client.cas("number",
uniqueValue.getCas(), (Integer)uniqueValue.getValue() + 1);
if (response.toString().equals("OK")) {
success++;
}
System.out.println(Thread.currentThread().getName() + " " + i
+ " time " + " update oldValue : " + uniqueValue
+ " , result : " + response);
}
if (success < 10) {
System.out.println(Thread.currentThread().getName()
+ " unsuccessful times : " + (10 - success ));
}
}
}
}
每次执行的结果都会不一样,如其中某次的执行结果为: 总共操作100次,冲突47次,且最后value由1涨到53,那么表示验证成功。
不足之处,诚请提出!
所谓CAS,check and set,在写操作时,先检查是否被别的线程修改过。
基本原理非常简单,一言以蔽之,就是“版本号”。每个存储的数据对象,多有一个版本号。我们可以从下面的例子来理解:
如果不采用CAS,则有如下的情景:
第一步,A取出数据对象X;
第二步,B取出数据对象X;
第三步,B修改数据对象X,并将其放入缓存;
第四步,A修改数据对象X,并将其放入缓存。
我们可以发现,第四步中会产生数据写入冲突。
如果采用CAS协议,则是如下的情景。
第一步,A取出数据对象X,并获取到CAS-ID1;
第二步,B取出数据对象X,并获取到CAS-ID2;
第三步,B修改数据对象X,在写入缓存前,检查CAS-ID与缓存空间中该数据的CAS-ID是否一致。结果是“一致”,就将修改后的带有CAS-ID2的X写入到缓存。
第四步,A修改数据对象Y,在写入缓存前,检查CAS-ID与缓存空间中该数据的CAS-ID是否一致。结果是“不一致”,则拒绝写入,返回存储失败。
这样CAS协议就用了“版本号”的思想,解决了冲突问题。
参考:http://blog.csdn.net/poechant/article/details/7082189
2.非CAS,模拟多客户端并发场景
package cn.slimsmart.memcache.demo.test.cas; import java.net.InetSocketAddress; import net.spy.memcached.MemcachedClient; import net.spy.memcached.internal.OperationFuture; /** * 非cas测试 */ public class NoCASTest { public static void main(String[] args) throws Exception{ MemcachedClient cache = new MemcachedClient(new InetSocketAddress("192.168.100.110", 11211)); cache.set("key", 1800, "key"); new MThread("thread--1", "X").start(); new MThread("thread--2","Y").start(); } } class MThread extends Thread { private MemcachedClient cache; private String value; public MThread(String name,String value) throws Exception{ super(name); cache = new MemcachedClient(new InetSocketAddress("192.168.100.110", 11211)); this.value=value; } @Override public void run() { OperationFuture<Boolean> future = cache.set("key", 1800, value); System.out.println(Thread.currentThread().getName()+":"+future.getStatus()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+":"+cache.get("key")); } }运行结果,时而分会X,时而分会Y,也可以为key,不定的结果。
3.CAS 多线程使用场景
package cn.slimsmart.memcache.demo.test.cas; import java.net.InetSocketAddress; import net.spy.memcached.CASResponse; import net.spy.memcached.CASValue; import net.spy.memcached.MemcachedClient; /** * cas测试 */ public class CASTest { public static void main(String[] args) throws Exception { MemcachedClient cache = new MemcachedClient(new InetSocketAddress("192.168.100.110", 11211)); cache.set("number", 1800, "number"); for (int i = 0; i < 2; i++) { new CASThread("thread--" + i, "number" + i).start(); } } } class CASThread extends Thread { private MemcachedClient cache; private String value; public CASThread(String name, String value) throws Exception { super(name); cache = new MemcachedClient(new InetSocketAddress("192.168.100.110", 11211)); this.value = value; } @Override public void run() { int i = 0; while (i < 5) { i++; CASValue<Object> uniqueValue = cache.gets("number"); CASResponse response = cache.cas("number", uniqueValue.getCas(), value); if (response.toString().equals("OK")) { System.out.println(Thread.currentThread().getName() + " update success:"+response); }else{ System.out.println(Thread.currentThread().getName() + " update fail:"+response); } } } }4.异步实时读写问题的解决方案SAC
当写入缓存后,如果立刻就有其他客户端进行读操作,则会读取失败,因为set是异步操作(async),很可能仍还没有写入完。
MemcachedClient cache = new MemcachedClient(cacheServerAddr);
cache.set("key", 3600, bigData);
return cache.get("key");
一种可行的方法,是采用同步写操作。常用的set方法没有这种方式,需要采用遵守Memcached的CAS(Check And Set)协议的写操作。而这种写操作,一般是基于读取后得到CAS ID(类似于SVN中的版本ID),根据这个CAS ID来保证写入时,没有和其他写入操作产生“写重复”冲突。因此,在我们现在所讨论的场景中,可以如下使用CAS协议:
(1)初始写入:写一个简单的初始值(set,异步操作);
(2)获取版本:使用异步方式获取CAS ID;
(3)同步写入:以同步方式写入数据,保证在读取前,已经写入结束。
MemcachedClient cache = new MemcachedClient(cacheServerAddr);
cache.set("key", 3600, "");
long casId = cache.asyncGets("key").get().getCas();
cache.cas("key", casid, bigData);
return cache.get("key");
以这种“Set-Asyncgets-Cas”方式的缓存异步实时读写问题的解决方案,我们称之为"SAC"。
什么是CAS协议
Memcached于1.2.4版本新增CAS(Check and Set)协议类同于Java并发的CAS(Compare and Swap)原子操作,处理同一item被多个线程更改过程的并发问题。
在Memcached中,每个key关联有一个64-bit长度的long型惟一数值,表示该key对应value的版本号。这个数值由Memcached server产生,从1开始,且同一Memcached server不会重复。在两种情况下这个版本数值会加1:1、新增一个key-value对;2、对某已有key对应的value值更新成功。删除item版本值不会减小。
例如
Java代码
MemcachedClient client = new MemcachedClient();
client.set("fKey", "fValue");
//第一次set, 在Memcached server中会维护fKey对应的value的版本号,假设是548;
client.set("fKey", "sValue");
//再次set,则这个fKey对应的value的版本号变为549;
CASValue casValue = client.gets("fKey");
//这样就可以得到对应key的cas版本号和实际value(各个Memcached client都有类似的对象表示,名字可能不一样,但效果类同),如 casValue.getValue = "sValue",casValue.getCas=549;
CAS协议解决的问题
模拟多个Memcached client并发set同一个key的场景。如clientA想把当前key的value set为"x",且操作成功;clientB却把当前key的value值由"x"覆盖set为"y",这时clientA再根据key去取value时得到"y"而不是期望的"x",它使用这个值,但不知道这个值已经被其它线程修改过,就可能会出现问题。
CAS协议解决这种并发修改问题。有线程试图修改当前key-value对的value时,先由gets方法得到item的版本号,操作完成提交数据时,使用cas方法谨慎变更,如果在本地对item操作过程中这个key-value对在Memcached server端被其它线程更改过,就放弃此次修改(乐观锁概念)。
Java代码
CASValue casValue = client.gets(key);
//*****
//本地的各种处理
//*****
CASResponse response = client.cas(key, newValue, casValue);
//在我取数据时item的版本号是casValue.getCas(),所以提交时我期望item的版本号是没有改变过的。如果被修改过,不是我取数据时的版本号,那么Memcached server对这次提交什么也不做,返回true或false由用户自己来提出解决方案(什么也不做或是重新获取版本号,再次重试提交等)
并发环境下的正确性验证
用多个Memcached client并发更改同一个key值,将value递增,如果 操作次数-CAS失败次数 = value增加的值,表示并发环境下CAS处理没有问题。
Java代码
import java.io.IOException;
import java.net.InetSocketAddress;
import net.spy.memcached.CASResponse;
import net.spy.memcached.CASValue;
import net.spy.memcached.MemcachedClient;
public class CASTest {
private static MemcachedClient client = null;
static {
try {
client = new MemcachedClient(
new InetSocketAddress("localhost", 11211));
} catch (IOException o) {
o.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
//Firstly, the key should exist.
//key is "number", value is Integer 1, 7845 is expire time
client.set("number", 7845, 1);
CASTest testObj = new CASTest();
//start the multithread environment
for (int i = 0; i < 10; i++) {
testObj.new ThreadTest("Thread-" + (i + 1)).start();
}
}
/**
* Each thread runs many times
*/
private class ThreadTest extends Thread {
private MemcachedClient client = null;
ThreadTest(String name) throws IOException {
super(name);
client = new MemcachedClient(
new InetSocketAddress("localhost", 11211));
}
public void run() {
int i = 0;
int success = 0;
while (i < 10) {
i++;
CASValue<Object> uniqueValue =client.gets("number");
CASResponse response = client.cas("number",
uniqueValue.getCas(), (Integer)uniqueValue.getValue() + 1);
if (response.toString().equals("OK")) {
success++;
}
System.out.println(Thread.currentThread().getName() + " " + i
+ " time " + " update oldValue : " + uniqueValue
+ " , result : " + response);
}
if (success < 10) {
System.out.println(Thread.currentThread().getName()
+ " unsuccessful times : " + (10 - success ));
}
}
}
}
每次执行的结果都会不一样,如其中某次的执行结果为: 总共操作100次,冲突47次,且最后value由1涨到53,那么表示验证成功。
不足之处,诚请提出!
相关文章推荐
- Memcache CAS协议介绍及使用
- CAS协议介绍和在apache使用cas实现单点登录sso
- Memcache的使用和协议分析详解
- [原创]Memcache的使用和协议分析详解
- Memcache的使用和协议分析详解
- Memcache的使用和协议分析详解
- Socket网络协议简单介绍和使用
- Gnutella协议的相关介绍(Peercast实现P2P传输所使用的协议)
- JSON-RPC轻量级远程调用协议介绍及使用
- 消息处理利器 ActiveMQ 的介绍 & Stomp 协议的使用
- JSON-RPC轻量级远程调用协议介绍及使用
- 软件使用协议-介绍
- Memcache的使用和协议分析详解
- [中间件] 消息处理利器 ActiveMQ 的介绍 & Stomp 协议的使用
- Memcache的使用和协议分析详解
- Memcache的使用和协议分析详解
- Memcache的使用和协议分析详解
- Memcache的使用和协议分析详解
- Memcache(MC)系列(一)Memcache介绍、使用、存储、算法、优化