您的位置:首页 > 数据库 > Redis

HyperLogLog-Redis中的基数统计算法

2017-12-13 16:45 746 查看





1.基本概念

基数(cardinality),是指一个集合中不同元素的个数。例如集合:{1,2,3,4,5,2,3,9,7}, 这个集合有9个元素,但是2和3各出现了两次,

因此不重复的元素为1,2,3,4,5,9,7,所以这个集合的基数是7。

Redis 在 2.8.9 版本添加了 HyperLogLog 结构。HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的

数量或者体积非常非常大时,计算基数所需的空间总是固定 的、并且是很小的。在 Redis 里面,每个 HyperLogLog 键只需要花费

12 KB 内存,就可以计算接近 2^64 个不同元素的基 数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。

但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输

入的各个元素。

Hyper LogLog通过对一个输入数据流M,应用一个哈希函数设置h(M)来工作。这将产生一个S = h(M) of {0,1}^∞字符串的可观测结果。

通过分割哈希输入流成m个子字符串,并对每个子输入流保持m的值可观测 ,这就是相当一个新Hyper LogLog(一个子m就是一个新

的Hyper LogLog)。利用额外的观测值的平均值,产生一个计数器,其精度随着m的增长而提高,这只需要对输入集合中的每个元素

执行几步操作就可以完成。


2.算法框架




3.算法推导和证明

hyperloglog算法背后是一些复杂的概率和统计知识,感兴趣的看下方的论文。




4.java实现

实现参考redis的源码(hyperloglog.c),进行了java实现。

murmurhash用来对输入的集合元素进行hash,并产生均匀分布的hash结果。
public class MurmurHash {
/**
* murmur hash算法实现
*/
public static long hash64(byte[] key) {

ByteBuffer buf = ByteBuffer.wrap(key);
int seed = 0x1234ABCD;

ByteOrder byteOrder = buf.order();
buf.order(ByteOrder.LITTLE_ENDIAN);

long m = 0xc6a4a7935bd1e995L;
int r = 47;

long h = seed ^ (buf.remaining() * m);

long k;
while (buf.remaining() >= 8) {
k = buf.getLong();

k *= m;
k ^= k >>> r;
k *= m;

h ^= k;
h *= m;
}

if (buf.remaining() > 0) {
ByteBuffer finish = ByteBuffer.allocate(8).order(
ByteOrder.LITTLE_ENDIAN);
// for big-endian version, do this first:
// finish.position(8-buf.remaining());
finish.put(buf).rewind();
h ^= finish.getLong();
h *= m;
}

h ^= h >>> r;
h *= m;
h ^= h >>> r;

buf.order(byteOrder);
return h;
}
}

hyperloglog实现
public class HyperLogLog {
private static final int HLL_P = 14;//64位hash值中标记分组索引的bit数量,分组越多误差越小,但占用的空间越大
private static final int HLL_REGISTERS = 1 << HLL_P;//总的分组数量
private static final int HLL_BITS = 6;//为保存每一个分组中最大起始0统计量,所需要的bit数量
private static final int HLL_REGISTER_MASK = (1 << HLL_BITS) - 1;//统计量的6位掩码
/**
* bitmap存储格式,采用小端存储,先存储最低有效位,然后存储最高有效位
* +--------+--------+--------+------//      //--+
* |11000000|22221111|33333322|55444444 ....     |
* +--------+--------+--------+------//      //--+
*/
private byte[] registers;

public HyperLogLog() {
//12288+1(12k)个字节,最后一个额外的字节相当于结束符,并没有实际用途
registers = new byte[(HLL_REGISTERS * HLL_BITS + 7) / 8 + 1];
}

//alpha系数,来自参考论文
private double alpha(int m) {
switch (m) {
case 16:
return 0.673;
case 32:
return 0.697;
case 64:
return 0.709;
default:
return 0.7213 / (1 + 1.079 / m);
}
}

//保存第index分组的值为val
private void setRegister(int index, int val) {
int _byte = index * HLL_BITS / 8;
int _fb = index * HLL_BITS & 7;
int _fb8 = 8 - _fb;
registers[_byte] &= ~(HLL_REGISTER_MASK << _fb);
registers[_byte] |= val << _fb;
registers[_byte + 1] &= ~(HLL_REGISTER_MASK >> _fb8);
registers[_byte + 1] |= val >> _fb8;
}
//读取第index分组的值
private int getRegister(int index) {
int _byte = index * HLL_BITS / 8;
int _fb = index * HLL_BITS & 7;
int _fb8 = 8 - _fb;
int b0 = registers[_byte] & 0xff;
int b1 = registers[_byte + 1] & 0xff;
return ((b0 >> _fb) | (b1 << _fb8)) & HLL_REGISTER_MASK;
}

public int hllAdd(int number) {
long hash = MurmurHash.hash64(Integer.toString(number).getBytes());
long index = hash >>> (64 - HLL_P);
int oldcount = getRegister((int) index);

//计算hash值中从HLL_P为开始的连续0数量,包括最后一个1
hash |= 1l;
long bit = 1l << (63 - HLL_P);
int count = 1;
while ((hash & bit) == 0l) {
count++;
bit >>= 1l;
}

if (count > oldcount) {
setRegister((int) index, count);
return 1;
} else {
return 0;
}
}

//估算基数
public long hllCount() {
//计算各分组统计量的调和平均数,SUM(2^-reg)
double E = 0;
int ez = 0;
double m = HLL_REGISTERS;
for (int i = 0; i < HLL_REGISTERS; i++) {
int reg = getRegister(i);
if (reg == 0) {
ez++;
} else {
E += 1.0d / (1l << reg);
}
}
E += ez;

E = 1 / E * alpha((int) m) * m * m;

if (E < m * 2.5 && ez != 0) {
E = m * Math.log(m / ez);
} else if (m == 16384 && E < 72000) {
//来自redis源码
double bias = 5.9119e-18 * E * E * E * E
- 1.4253e-12 * E * E * E
+ 1.2940e-7 * E * E
- 5.2921e-3 * E
+ 83.3216;
E -= E * (bias / 100);
}
return (long) E;
}
}

测试
public class Test {
//测试n个元素的集合
public static void testHyperLogLog(int n) {
System.out.println("n = " + n);
HyperLogLog hyperLogLog = new HyperLogLog();
Set<Integer> s = new HashSet<>();
Random random = new Random();
for (int i = 0; i < n; i++) {
int number = random.nextInt();
hyperLogLog.hllAdd(number);
s.add(number);
}

System.out.println("hyperLogLog count = " + hyperLogLog.hllCount());
System.out.println("hashset count = " + s.size());
System.out.println("error rate = " + Math.abs((double) hyperLogLog.hllCount() / s.size() - 1));
}

public static void main(String[] args) {
int n = 1;
for (int i = 0; i < 9; i++) {
n *= 10;
testHyperLogLog(n);
}
}
}

5.测试效果

n为产生的随即元素总个数,第二行hyperLogLog count为hyperLogLog算法估计的基数,hashset count为使用hashset统计出的精确结果,error rate为错误率。

可以看出大部分情况下hyperloglog算法的错误率都在1%以内,当元素总个数达到1亿时,hashset报出异常。
n = 10
hyperLogLog count = 10
hashset count = 10
error rate = 0.0
n = 100
hyperLogLog count = 100
hashset count = 100
error rate = 0.0
n = 1000
hyperLogLog count = 1002
hashset count = 1000
error rate = 0.0020000000000000018
n = 10000
hyperLogLog count = 9974
hashset count = 10000
error rate = 0.0026000000000000467
n = 100000
hyperLogLog count = 100721
hashset count = 99999
error rate = 0.007220072200722072
n = 1000000
hyperLogLog count = 990325
hashset count = 999883
error rate = 0.00955911841685475
n = 10000000
hyperLogLog count = 9966476
hashset count = 9988334
error rate = 0.002188352932531057
n = 100000000
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at java.util.HashMap.resize(HashMap.java:703)
at java.util.HashMap.putVal(HashMap.java:662)
at java.util.HashMap.put(HashMap.java:611)
at java.util.HashSet.add(HashSet.java:219)
at com.sankuai.alg.Test.testHyperLogLog(Test.java:24)
at com.sankuai.alg.Test.main(Test.java:36)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

Process finished with exit code 1

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: