您的位置:首页 > 数据库 > Memcache

memcache append 大数据 大字符串 压缩

2016-07-12 19:40 609 查看
关于MemcacheClient中CachedData.flag的说明

0000000000000010==COMPRESSED==2

0000000000000001==SERIALIZED==1

0000000100000000==SPECIAL_BOOLEAN==256

0000001000000000==SPECIAL_INT==512

0000001100000000==SPECIAL_LONG==768

0000010000000000==SPECIAL_DATE==1024

0000010100000000==SPECIAL_BYTE==1280

0000011000000000==SPECIAL_FLOAT==1536

0000011100000000==SPECIAL_DOUBLE==1792

0000100000000000==SPECIAL_BYTEARRAY==2048

0000100100000000==SPECIAL_X_COMPRESSED==2304

1111111100000000==SPECIAL_MASK==65280

从以上二进制可以看出,这个flag分成了高8位与低8位,这两部分各有含意。

低8位,用来实现 "业务含意叠加" 的效果

高8位,用来表示其它普通的状态,共可表示256种状态。

先看一个典型的encode过程,先序列化,再压缩

int flag = 0;//初始化

flag |= SERIALIZED;//flag=1, 二进制,01

flag |= COMPRESSED;//flag=3, 二进制,11

经过这2步后,flag就饱含了2种意思

从右数,第1位是1,表示是SERIALIZED,

从右数,第2位是1,表示是COMPRESSED,

从右数,第1位和第2位,一直到第8位,都是特殊的标识位。

如果想要用这个flag实现 "业务含意叠加" 的效果,flag只能取

1,10,100,1000,10000,100000,1000000,10000000,共8个值,即低8位,每位一个值

再看decode过程

int flag = 3//二进制,11

(flags & COMPRESSED) = 1;//说明是压缩数据,其实比较的是从右数第2个标识位是不是1,其它位是不是0

flags = flags & SPECIAL_MASK;//这一行的作用是抹去低8位(特殊的标识位)

再去判断是什么类型就可以正确的用switch了,比较的是高8位。

============================================= 分隔线 =============================================

我们都知道,memcache性能还是相当不错的,而且还可以设置是否启用压缩。

对于set方法,如果超过压缩阀值,会启用压缩,这没问题。

对于append方法,如果超过压缩阀值,同样会启用压缩,但问题来了,append的数据会有2种不同的类型,一种是启用压缩的,一种是未启用压缩的,那么解压缩时,就会出错了。

好,知道问题原因了,那就fix它。我们知道不管是set还是append,memcache最终存储的数据都是byte1[],我用自定义一种byte2[],与byte1[] 不同的是,我们在byte1[]前面加了两种信息,一种信息是该byte1[]有没有进行压缩,第二种信息是byte1[]的长度,最终把byte2[]存储到memcache。解压时,反向解压即可。

只需要重新定义一下Transcoder和一个自定义类即可。

XCompress.java

package com.collonn.javaUtilMvn.memcache.transcoder;

import java.nio.charset.Charset;

/**
* Created by jelly on 2016-7-8.
*/
public class XCompress {
private byte[] bytes;

public XCompress(String str){
this.bytes = str.getBytes(Charset.forName("UTF-8"));
}

public XCompress(String str, String charSetName){
this.bytes = str.getBytes(Charset.forName(charSetName));
}

public XCompress(byte[] bytes){
this.bytes = bytes;
}

public byte[] getBytes() {
return bytes;
}

public void setBytes(byte[] bytes) {
this.bytes = bytes;
}
}


Transcoder for net.rubyeye.xmemcached.MemcachedClient

package com.collonn.javaUtilMvn.memcache.transcoder;

import net.rubyeye.xmemcached.transcoders.BaseSerializingTranscoder;
import net.rubyeye.xmemcached.transcoders.CachedData;
import net.rubyeye.xmemcached.transcoders.Transcoder;
import net.rubyeye.xmemcached.transcoders.TranscoderUtils;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Date;

/**
* Transcoder that serializes and compresses objects.
*/
public class XTranscoder extends BaseSerializingTranscoder implements
Transcoder<Object> {

public void setPackZeros(boolean packZeros) {
this.transcoderUtils.setPackZeros(packZeros);

}

public void setPrimitiveAsString(boolean primitiveAsString) {
this.primitiveAsString = primitiveAsString;
}

private final int maxSize;

private boolean primitiveAsString;

public final int getMaxSize() {
return this.maxSize;
}

// General flags
public static final int SERIALIZED = 1;
public static final int COMPRESSED = 2;

// Special flags for specially handled types.
public static final int SPECIAL_MASK = 0xff00;
public static final int SPECIAL_BOOLEAN = (1 << 8);
public static final int SPECIAL_INT = (2 << 8);
public static final int SPECIAL_LONG = (3 << 8);
public static final int SPECIAL_DATE = (4 << 8);
public static final int SPECIAL_BYTE = (5 << 8);
public static final int SPECIAL_FLOAT = (6 << 8);
public static final int SPECIAL_DOUBLE = (7 << 8);
public static final int SPECIAL_BYTEARRAY = (8 << 8);
public static final int SPECIAL_XCOMPRESSED = (9 << 8);

private final TranscoderUtils transcoderUtils = new TranscoderUtils(true);

public TranscoderUtils getTranscoderUtils() {
return transcoderUtils;
}

/**
* Get a serializing transcoder with the default max data size.
*/
public XTranscoder() {
this(CachedData.MAX_SIZE);
}

/**
* Get a serializing transcoder that specifies the max data size.
*/
public XTranscoder(int max) {
this.maxSize = max;
}

public boolean isPackZeros() {
return this.transcoderUtils.isPackZeros();
}

public boolean isPrimitiveAsString() {
return this.primitiveAsString;
}

/*
* (non-Javadoc)
*
* @see net.spy.memcached.Transcoder#decode(net.spy.memcached.CachedData)
*/
public final Object decode(CachedData d) {
byte[] data = d.getData();

int flags = d.getFlag();
if ((flags & COMPRESSED) != 0) {
data = decompress(d.getData());
}
flags = flags & SPECIAL_MASK;
return decode0(d,data, flags);
}

protected final Object decodeXCompress(byte[] data) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
String str = null;

try {
int nextIdx = 0;
int total = data.length;
while (nextIdx < total) {
byte compressedByte = data[nextIdx];

int length = fromBytes(data[nextIdx + 1], data[nextIdx + 2], data[nextIdx + 3], data[nextIdx + 4]);
byte[] dataBlock = Arrays.copyOfRange(data, nextIdx + 5, nextIdx + 5 + length);
if (compressedByte == 1) {
dataBlock = decompress(dataBlock);
}

byteArrayOutputStream.write(dataBlock);
nextIdx += (1 + 4 + length);
}

byte[] deCompressData = byteArrayOutputStream.toByteArray();
str = decodeString(deCompressData);
}catch (Exception e){
log.error("deCompress XCompress error", e);
}finally {
try {
byteArrayOutputStream.close();
} catch (IOException e) {

}
}

return str;
}

protected final Object decode0(CachedData cachedData,byte[] data, int flags) {
Object rv = null;
if ((cachedData.getFlag() & SERIALIZED) != 0 && data != null) {
rv = deserialize(data);
} else {
if (this.primitiveAsString) {
if (flags == 0) {
return decodeString(data);
}
}
if (flags != 0 && data != null) {
switch (flags) {
case SPECIAL_BOOLEAN:
rv = Boolean.valueOf(this.transcoderUtils
.decodeBoolean(data));
break;
case SPECIAL_INT:
rv = Integer.valueOf(this.transcoderUtils.decodeInt(data));
break;
case SPECIAL_LONG:
rv = Long.valueOf(this.transcoderUtils.decodeLong(data));
break;
case SPECIAL_BYTE:
rv = Byte.valueOf(this.transcoderUtils.decodeByte(data));
break;
case SPECIAL_FLOAT:
rv = new Float(Float.intBitsToFloat(this.transcoderUtils
.decodeInt(data)));
break;
case SPECIAL_DOUBLE:
rv = new Double(Double
.longBitsToDouble(this.transcoderUtils
.decodeLong(data)));
break;
case SPECIAL_DATE:
rv = new Date(this.transcoderUtils.decodeLong(data));
break;
case SPECIAL_BYTEARRAY:
rv = data;
break;
case SPECIAL_XCOMPRESSED:
rv = decodeXCompress(data);
break;
default:
log
.warn(String.format("Undecodeable with flags %x",
flags));
}
} else {
rv = decodeString(data);
}
}
return rv;
}

/*
* (non-Javadoc)
*
* @see net.spy.memcached.Transcoder#encode(java.lang.Object)
*/
public final CachedData encode(Object o) {
byte[] b = null;
int flags = 0;

if (o instanceof String) {
b = encodeString((String) o);
} else if (o instanceof Long) {
if (this.primitiveAsString) {
b = encodeString(o.toString());
} else {
b = this.transcoderUtils.encodeLong((Long) o);
}
flags |= SPECIAL_LONG;
} else if (o instanceof Integer) {
if (this.primitiveAsString) {
b = encodeString(o.toString());
} else {
b = this.transcoderUtils.encodeInt((Integer) o);
}
flags |= SPECIAL_INT;
} else if (o instanceof Boolean) {
if (this.primitiveAsString) {
b = encodeString(o.toString());
} else {
b = this.transcoderUtils.encodeBoolean((Boolean) o);
}
flags |= SPECIAL_BOOLEAN;
} else if (o instanceof Date) {
b = this.transcoderUtils.encodeLong(((Date) o).getTime());
flags |= SPECIAL_DATE;
} else if (o instanceof Byte) {
if (this.primitiveAsString) {
b = encodeString(o.toString());
} else {
b = this.transcoderUtils.encodeByte((Byte) o);
}
flags |= SPECIAL_BYTE;
} else if (o instanceof Float) {
if (this.primitiveAsString) {
b = encodeString(o.toString());
} else {
b = this.transcoderUtils.encodeInt(Float
.floatToRawIntBits((Float) o));
}
flags |= SPECIAL_FLOAT;
} else if (o instanceof Double) {
if (this.primitiveAsString) {
b = encodeString(o.toString());
} else {
b = this.transcoderUtils.encodeLong(Double
.doubleToRawLongBits((Double) o));
}
flags |= SPECIAL_DOUBLE;
} else if (o instanceof byte[]) {
b = (byte[]) o;
flags |= SPECIAL_BYTEARRAY;
} else {
if(!(o instanceof XCompress)){
b = serialize(o);
flags |= SERIALIZED;
}
}

assert b != null;
if (this.primitiveAsString) {
// It is not be SERIALIZED,so change it to string type
if ((flags & SERIALIZED) == 0) {
flags = 0;
}
}

if(o instanceof XCompress){
flags = SPECIAL_XCOMPRESSED;
b = processXCompress((XCompress)o);
}else{
if (b.length > this.compressionThreshold) {
byte[] compressed = compress(b);
if (compressed.length < b.length) {
if (log.isDebugEnabled()) {
log.debug("Compressed " + o.getClass().getName() + " from "
+ b.length + " to " + compressed.length);
}
b = compressed;
flags |= COMPRESSED;
} else {
if (log.isDebugEnabled()) {
log.debug("Compression increased the size of "
+ o.getClass().getName() + " from " + b.length
+ " to " + compressed.length);
}
}
}
}

return new CachedData(flags, b, this.maxSize, -1);
}

private byte[] processXCompress(XCompress xCompress){
byte compressed = 1;
byte[] compressedData = compress(xCompress.getBytes());
if(xCompress.getBytes().length < compressedData.length){
compressedData = xCompress.getBytes();
compressed = 0;
}
byte[] compressedDataLength = toByteArray(compressedData.length);

byte[] joinedData = null;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try {
byteArrayOutputStream.write(compressed);
byteArrayOutputStream.write(compressedDataLength);
byteArrayOutputStream.write(compressedData);

joinedData = byteArrayOutputStream.toByteArray();
}catch (Exception e){
log.error("compress XCompress error", e);
}finally {
try {
byteArrayOutputStream.close();
} catch (IOException e) {
}
}

return joinedData;
}

private byte[] toByteArray(int value){
return new byte[] {
(byte) (value >> 24),
(byte) (value >> 16),
(byte) (value >> 8),
(byte) value};
}

public int fromBytes(byte b1, byte b2, byte b3, byte b4) {
return b1 << 24 | (b2 & 0xFF) << 16 | (b3 & 0xFF) << 8 | (b4 & 0xFF);
}
}


test code for net.rubyeye.xmemcached.MemcachedClient

package com.collonn.javaUtilMvn.memcache;

import com.collonn.javaUtilMvn.memcache.transcoder.XCompress;
import com.collonn.javaUtilMvn.memcache.transcoder.XTranscoder;
import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.XMemcachedClientBuilder;
import net.rubyeye.xmemcached.command.BinaryCommandFactory;
import net.rubyeye.xmemcached.utils.AddrUtil;

import java.io.IOException;
import java.util.Date;

public class XMemcachedMemTest {
public static MemcachedClient memClient;

static {
try {
XMemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("127.0.0.1:11211"));
builder.setCommandFactory(new BinaryCommandFactory());
builder.setConnectTimeout(5000);
builder.setOpTimeout(5000);
XTranscoder xTranscoder = new XTranscoder();
xTranscoder.setCompressionThreshold(10);
builder.setTranscoder(xTranscoder);
//            builder.setConnectionPoolSize(10);
//            builder.setSocketOption(StandardSocketOption.SO_RCVBUF, 32 * 1024); // 设置接收缓存区为32K,默认16K
//            builder.setSocketOption(StandardSocketOption.SO_SNDBUF, 16 * 1024); // 设置发送缓冲区为16K,默认为8K
//            builder.setSocketOption(StandardSocketOption.TCP_NODELAY, false); // 启用nagle算法,提高吞吐量,默认关闭
memClient = builder.build();
} catch (IOException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws Exception {
try {
String key = "Jelly-key-" + new Date().getTime();

String v1 = "Welcome to Fabric’s documentation.";
memClient.set(key, 20, new XCompress(v1));
String v2 = "Usage documentation";
memClient.append(key, new XCompress(v2));
String value = memClient.get(key);
System.out.println("final data as follows:\n" + value);
}catch (Exception e){
e.printStackTrace();
}finally {
if(memClient != null){
memClient.shutdown();
}
}
}
}


Transcoder for net.spy.memcached.MemcachedClient
package com.collonn.javaUtilMvn.memcache.transcoder;

import net.spy.memcached.CachedData;
import net.spy.memcached.transcoders.BaseSerializingTranscoder;
import net.spy.memcached.transcoders.Transcoder;
import net.spy.memcached.transcoders.TranscoderUtils;
import net.spy.memcached.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Date;

/**
* Transcoder that serializes and compresses objects.
*/
public class SpyTranscoder extends BaseSerializingTranscoder implements Transcoder<Object> {
protected static final Logger log = LoggerFactory.getLogger(SpyTranscoder.class);

// General flags
static final int SERIALIZED = 1;
static final int COMPRESSED = 2;

// Special flags for specially handled types.
private static final int SPECIAL_MASK = 0xff00;
static final int SPECIAL_BOOLEAN = (1 << 8);
static final int SPECIAL_INT = (2 << 8);
static final int SPECIAL_LONG = (3 << 8);
static final int SPECIAL_DATE = (4 << 8);
static final int SPECIAL_BYTE = (5 << 8);
static final int SPECIAL_FLOAT = (6 << 8);
static final int SPECIAL_DOUBLE = (7 << 8);
static final int SPECIAL_BYTEARRAY = (8 << 8);
static final int SPECIAL_XCOMPRESSED = (9 << 8);

private final TranscoderUtils tu = new TranscoderUtils(true);

/**
* Get a serializing transcoder with the default max data size.
*/
public SpyTranscoder() {
this(CachedData.MAX_SIZE);
}

/**
* Get a serializing transcoder that specifies the max data size.
*/
public SpyTranscoder(int max) {
super(max);
}

/*
* (non-Javadoc)
*
* @see net.spy.memcached.Transcoder#decode(net.spy.memcached.CachedData)
*/
public Object decode(CachedData d) {
byte[] data = d.getData();
Object rv = null;
if ((d.getFlags() & COMPRESSED) != 0) {
data = decompress(d.getData());
}
int flags = d.getFlags() & SPECIAL_MASK;
if ((d.getFlags() & SERIALIZED) != 0 && data != null) {
rv = deserialize(data);
} else if (flags != 0 && data != null) {
switch (flags) {
case SPECIAL_BOOLEAN:
rv = Boolean.valueOf(tu.decodeBoolean(data));
break;
case SPECIAL_INT:
rv = Integer.valueOf(tu.decodeInt(data));
break;
case SPECIAL_LONG:
rv = Long.valueOf(tu.decodeLong(data));
break;
case SPECIAL_DATE:
rv = new Date(tu.decodeLong(data));
break;
case SPECIAL_BYTE:
rv = Byte.valueOf(tu.decodeByte(data));
break;
case SPECIAL_FLOAT:
rv = new Float(Float.intBitsToFloat(tu.decodeInt(data)));
break;
case SPECIAL_DOUBLE:
rv = new Double(Double.longBitsToDouble(tu.decodeLong(data)));
break;
case SPECIAL_BYTEARRAY:
rv = data;
break;
case SPECIAL_XCOMPRESSED:
rv = decodeXCompress(data);
break;
default:
getLogger().warn("Undecodeable with flags %x", flags);
}
} else {
rv = decodeString(data);
}
return rv;
}

protected final Object decodeXCompress(byte[] data) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
String str = null;

try {
int nextIdx = 0;
int total = data.length;
while (nextIdx < total) {
byte compressedByte = data[nextIdx];

int length = fromBytes(data[nextIdx + 1], data[nextIdx + 2], data[nextIdx + 3], data[nextIdx + 4]);
byte[] dataBlock = Arrays.copyOfRange(data, nextIdx + 5, nextIdx + 5 + length);
if (compressedByte == 1) {
dataBlock = decompress(dataBlock);
}

byteArrayOutputStream.write(dataBlock);
nextIdx += (1 + 4 + length);
}

byte[] deCompressData = byteArrayOutputStream.toByteArray();
str = decodeString(deCompressData);
}catch (Exception e){
log.error("deCompress XCompress error", e);
}finally {
try {
byteArrayOutputStream.close();
} catch (IOException e) {

}
}

return str;
}

/*
* (non-Javadoc)
*
* @see net.spy.memcached.Transcoder#encode(java.lang.Object)
*/
public CachedData encode(Object o) {
byte[] b = null;
int flags = 0;
if (o instanceof String) {
b = encodeString((String) o);
if (StringUtils.isJsonObject((String) o)) {
return new CachedData(flags, b, getMaxSize());
}
} else if (o instanceof Long) {
b = tu.encodeLong((Long) o);
flags |= SPECIAL_LONG;
} else if (o instanceof Integer) {
b = tu.encodeInt((Integer) o);
flags |= SPECIAL_INT;
} else if (o instanceof Boolean) {
b = tu.encodeBoolean((Boolean) o);
flags |= SPECIAL_BOOLEAN;
} else if (o instanceof Date) {
b = tu.encodeLong(((Date) o).getTime());
flags |= SPECIAL_DATE;
} else if (o instanceof Byte) {
b = tu.encodeByte((Byte) o);
flags |= SPECIAL_BYTE;
} else if (o instanceof Float) {
b = tu.encodeInt(Float.floatToRawIntBits((Float) o));
flags |= SPECIAL_FLOAT;
} else if (o instanceof Double) {
b = tu.encodeLong(Double.doubleToRawLongBits((Double) o));
flags |= SPECIAL_DOUBLE;
} else if (o instanceof byte[]) {
b = (byte[]) o;
flags |= SPECIAL_BYTEARRAY;
} else {
if(!(o instanceof XCompress)){
b = serialize(o);
flags |= SERIALIZED;
}
}

assert b != null;

if(o instanceof XCompress){
flags = SPECIAL_XCOMPRESSED;
b = processXCompress((XCompress)o);
}else{
if (b.length > compressionThreshold) {
byte[] compressed = compress(b);
if (compressed.length < b.length) {
getLogger().debug("Compressed %s from %d to %d",
o.getClass().getName(), b.length, compressed.length);
b = compressed;
flags |= COMPRESSED;
} else {
getLogger().info("Compression increased the size of %s from %d to %d",
o.getClass().getName(), b.length, compressed.length);
}
}
}

return new CachedData(flags, b, getMaxSize());
}

private byte[] processXCompress(XCompress xCompress){
byte compressed = 1;
byte[] compressedData = compress(xCompress.getBytes());
if(xCompress.getBytes().length < compressedData.length){
compressedData = xCompress.getBytes();
compressed = 0;
}
byte[] compressedDataLength = toByteArray(compressedData.length);

byte[] joinedData = null;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try {
byteArrayOutputStream.write(compressed);
byteArrayOutputStream.write(compressedDataLength);
byteArrayOutputStream.write(compressedData);

joinedData = byteArrayOutputStream.toByteArray();
}catch (Exception e){
log.error("compress XCompress error", e);
}finally {
try {
byteArrayOutputStream.close();
} catch (IOException e) {
}
}

return joinedData;
}

private byte[] toByteArray(int value){
return new byte[] {
(byte) (value >> 24),
(byte) (value >> 16),
(byte) (value >> 8),
(byte) value};
}

public int fromBytes(byte b1, byte b2, byte b3, byte b4) {
return b1 << 24 | (b2 & 0xFF) << 16 | (b3 & 0xFF) << 8 | (b4 & 0xFF);
}
}


test code for net.spy.memcached.MemcachedClient

package com.collonn.javaUtilMvn.memcache;

import com.collonn.javaUtilMvn.memcache.transcoder.SpyTranscoder;
import com.collonn.javaUtilMvn.memcache.transcoder.XCompress;
import net.spy.memcached.ConnectionFactory;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.MemcachedClient;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class SpyMemcachedMemTest {
public static MemcachedClient memClient;

static {
try {
ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder();
SpyTranscoder spyTranscoder = new SpyTranscoder();
spyTranscoder.setCompressionThreshold(10);
builder.setTranscoder(spyTranscoder);
ConnectionFactory connectionFactory = builder.build();
List<InetSocketAddress> serverList = new ArrayList<>();
serverList.add(new InetSocketAddress("127.0.0.1", 11211));
memClient = new MemcachedClient(connectionFactory, serverList);
} catch (IOException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws Exception {
try {
String key = "Jelly-key-" + new Date().getTime();

String v1 = "Welcome to Fabric’s documentation.";
memClient.set(key, 20, new XCompress(v1));
String v2 = "Usage documentation";
memClient.append(key, new XCompress(v2));
String value = (String)memClient.get(key);
System.out.println("final data as follows:\n" + value);
}catch (Exception e){
e.printStackTrace();
}finally {
if(memClient != null){
memClient.shutdown();
}
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息