您的位置:首页 > 运维架构

【转载】"一致性hash"算法与server列表维护(备忘)

2016-01-22 09:45 253 查看
普通的hash算法有个很大的问题:当hash的"模数"发生变化时,整个hash数据结构就需要重新hash,重新hash之后的数据分布一定会和hash之前的不同;在很多场景下,"模数"的变化时必然的,但是这种"数据分布"的巨大变化却会带来一些麻烦.所以,就有了"一致性hash",当然学术界对"一致性hash"的阐述,还远远不止这些.
    在编程应用方面,最直观的例子就是"分布式缓存",一个cache集群中,有N台物理server,为了提升单台server的支撑能力,可能会考虑将数据通过hash的方式相对均匀的分布在每个server上.
    判定方式: location = hashcode(key) % N;事实上,由于需要,N可能会被增加或者削减,不管程序上是否能够妥善的支持N的变更,单从"数据迁移"的面积而言,也是非常大的.
    一致性Hash很巧妙的简化了这个问题,同时再使用"虚拟节点"的方式来细分数据的分布.



 

F1
    图示中表名,有2个物理server节点,每个物理server节点有多个Snode虚拟节点,server1和server2的虚拟节点互相"穿插"且依次排列,每个snode都有一个code,它表示接受数据的hashcode起始值(或终止值),比如上述图示中第一个snode.code为500,即当一个数据的hashcode值在[0,500]时则会被存储在它上.
    引入虚拟节点之后,事情就会好很多;假如KEY1分布在Snode3上,snode3事实为物理server1,当server1故障后,snode2也将被移除,那么KEY1将会被分布在"临近的"虚拟节点上--snode2(或者snode4,由实现而定);无论是存取,下一次对KEY1的操作将会有snode2(或snode4)来服务.
    1) 一个物理server在逻辑上分成N个虚拟节点(本例中为256个)
    2) 多个物理server的虚拟节点需要散列分布,即互相"穿插".
    3) 所有的虚拟节点,在逻辑上形成一个链表
    4) 每个虚拟节点,负责一定区间的hashcode值.
 

Java代码  


import java.net.InetSocketAddress;  

import java.net.Socket;  

import java.net.SocketAddress;  

import java.nio.charset.Charset;  

import java.security.MessageDigest;  

import java.security.NoSuchAlgorithmException;  

import java.util.Map;  

import java.util.NavigableMap;  

import java.util.TreeMap;  

  

  

public class ServersPool {  

  

    private static final int VIRTUAL_NODES_NUMBER = 256;//物理节点对应的虚拟节点的个数  

    private static final String TAG = ".-vtag-.";  

    private NavigableMap<Long, SNode> innerPool = new TreeMap<Long, SNode>();  

    private Hashing hashing = new Hashing();  

  

    /** 

     * 新增物理server地址 

     * @param address 

     * @param  weight 

     * 权重,权重越高,其虚拟节点的个数越高,事实上没命中的概率越高 

     * @throws Exception 

     */  

    public synchronized void addServer(String address,int weight) throws Exception {  

        SNode prev = null;  

        SNode header = null;  

        String[] tmp = address.split(":");  

        InnerServer server = new InnerServer(tmp[0], Integer.parseInt(tmp[1]));  

        server.init();  

        //将一个address下的所有虚拟节点SNode形成链表,可以在removeServer,以及  

        //特殊场景下使用  

        int max = 1;  

        if(weight > 0){  

            max = VIRTUAL_NODES_NUMBER * weight;  

        }  

        for (int i = 0; i < max; i++) {  

            long code = hashing.hash(address + TAG + i);  

            SNode current = new SNode(server, code);  

            if (header == null) {  

                header = current;  

            }  

            current.setPrev(prev);  

            innerPool.put(code, current);  

            prev = current;  

        }  

    }  

    /** 

     * 删除物理server地址,伴随着虚拟节点的删除 

     * @param address 

     */  

    public synchronized void removeServer(String address) {  

        long code = hashing.hash(address + TAG + (VIRTUAL_NODES_NUMBER - 1));  

        SNode current = innerPool.get(code);  

        if(current == null){  

            return;  

        }  

        if(!current.getAddress().equalsIgnoreCase(address)){  

            return;  

        }  

        current.getServer().close();;  

        while (current != null) {  

            current = innerPool.remove(current.getCode()).getPrev();  

        }  

  

    }  

  

    /** 

     * 根据指定的key,获取此key应该命中的物理server信息 

     * @param key 

     * @return 

     */  

    public InnerServer getServer(String key) {  

        long code = hashing.hash(key);  

        SNode snode = innerPool.lowerEntry(code).getValue();  

        if (snode == null) {  

            snode = innerPool.firstEntry().getValue();  

        }  

        return snode.getServer();  

    }  

  

  

    /** 

     * 虚拟节点描述 

     */  

    class SNode {  

        Long code;  

        InnerServer server;  

        SNode prev;  

  

        SNode(InnerServer server, Long code) {  

            this.server = server;  

            this.code = code;  

        }  

  

        SNode getPrev() {  

            return prev;  

        }  

  

        void setPrev(SNode prev) {  

            this.prev = prev;  

        }  

  

        Long getCode() {  

            return this.code;  

        }  

  

        InnerServer getServer() {  

            return server;  

        }  

        String getAddress(){  

            return server.ip + ":" + server.port;  

        }  

    }  

  

    /** 

     * hashcode生成 

     */  

    class Hashing {  

        //少量优化性能  

        private ThreadLocal<MessageDigest> md5Holder = new ThreadLocal<MessageDigest>();  

        private Charset DEFAULT_CHARSET = Charset.forName("utf-8");  

  

        public long hash(String key) {  

            return hash(key.getBytes(DEFAULT_CHARSET));  

        }  

  

        public long hash(byte[] key) {  

            try {  

                if (md5Holder.get() == null) {  

                    md5Holder.set(MessageDigest.getInstance("MD5"));  

                }  

            } catch (NoSuchAlgorithmException e) {  

                throw new IllegalStateException("no md5 algorythm found");  

            }  

            MessageDigest md5 = md5Holder.get();  

  

            md5.reset();  

            md5.update(key);  

            byte[] bKey = md5.digest();  

            long res = ((long) (bKey[3] & 0xFF) << 24)  

                    | ((long) (bKey[2] & 0xFF) << 16)  

                    | ((long) (bKey[1] & 0xFF) << 8) | (long) (bKey[0] & 0xFF);  

            return res;  

        }  

    }  

  

    /** 

     * 与物理server的TCP链接,用于实际的IO操作 

     */  

    class InnerServer {  

        String ip;  

        int port;  

        Socket socket;  

  

        InnerServer(String ip, int port) {  

            this.ip = ip;  

            this.port = port;  

        }  

  

        synchronized void init() throws Exception {  

            SocketAddress socketAddress = new InetSocketAddress(ip, port);  

            socket = new Socket();  

            socket.connect(socketAddress, 30000);  

        }  

  

        public boolean write(byte[] sources) {  

            //TODO  

            return true;  

        }  

  

        public byte[] read() {  

            //TODO  

            return new byte[]{};  

        }  

  

        public void close(){  

             if(socket == null || socket.isClosed()){  

                 return;  

             }  

            try{  

                socket.close();  

            } catch (Exception e){  

                //  

            }  

        }  

    }  

}  

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: