您的位置:首页 > 其它

一致性hash实现

2018-04-19 18:41 399 查看

缘起

一致性hash也算是接触到过很多回了,不过一直没有自己真正去实现过,今天在一致性hash在分布式系统中的应用 看到了一个简单的Python版本实现,正好这两天在学习scala,尝试着用scala实现了一版。

一致性hash

分布式系统

一致性hash一般都是在分布式系统中应用,分布式系统的目的之二就是提高系统可用性和容量,可用性要求我们在机器挂掉之后能快速恢复,容量要求我们要用多台机器来存储数据,一致性hash提供了一种思想,一种如何将请求,数据均匀分布到各台机器上的一种思想。

使用场景

KV系统

缓存系统

负载均衡系统

原理

先来看一张图


上面这张图是一致性hash的简单描述,这是一个有2^32个位置的圆,每个node根据ID(可以是主机名也可以是ip)分布到环上,在数据过来之后,将数据取hash值,向后找到离自己最近的那个节点,就由这个节点来提供具体的服务了。

在节点数较少的时候,因为node也是通过计算hash来确认自己在环上的位置,想要保证均匀分布还是很困难的。我们可以通过加入虚拟节点来保证,每个节点有n个replica,这样可以做到相对均匀。

原文是使用Python实现的,在这我使用scala实现一下,也算是动手实践一下scala编程。

注:不要使用Object的hashCode方法,在只有i变化时,生成的hash也是连续的,我使用md5摘要后做了一下变通

import java.security.MessageDigest

import scala.collection.mutable

/**
* Created by liufengquan on 2018/4/13.
*/
class ConsistentHash(virtualNodes: Int) {
val servers: mutable.Set[String] = mutable.Set()

/** store virtual node's hash */
private var hashRing = List[Int]()
/** virtual node's hash -> node identity*/
private var hashServerMap = mutable.Map[Int, String]()

/**
* put a node, that is a server in pool
* @param ip
*/
def addNode(ip: String): Unit = {
servers += ip
(1 to virtualNodes).foreach(i => {
val key = getKeyHash(ip + ":" + i)
hashRing = key :: hashRing
hashServerMap += key -> ip
})

hashRing = hashRing.sorted
}

/**
* remove a node
* @param ip
*/
def removeNode(ip: String): Unit = {
servers -= ip
(1 to virtualNodes).foreach(i => {
val key = getKeyHash(ip + "i")
hashRing = hashRing.filter(_ != key)
hashServerMap = hashServerMap.filter(_._1 != key)
})
}

/**
* get a node according to a key
* @param key
* @return
*/
def getNode(key: String): String = {
require(servers != null && servers.nonEmpty)

val hash = getKeyHash(key)
val avail = hashRing.filter(_ > hash)
if (avail.nonEmpty) hashServerMap(avail.head)
else hashServerMap(hashRing.head)
}

def getServers: String = servers mkString ":"

/**
* naive hashCode has bad randomness,
* use md5 and do a little exchange
* @param key
* @return
*/
private def getKeyHash(key: String): Int = {
val digest = MessageDigest.getInstance("MD5")

val bytes = digest.digest(key.getBytes("UTF-8"))

bytes.map(_.toInt).reduceLeft(_ * 11 + _)
}
}

object ConsistentHash {
val servers = List("10.10.187.1", "10.10.187.2", "10.10.187.3", "10.10.187.4", "10.10.187.5")

def consistentHashTest(replica: Int): Unit = {
val consistentHash = new ConsistentHash(replica)

var map = mutable.Map[String, List[String]]()
val text = """The following examples show some safe operations to drop or change columns. Dropping the final column in a table lets Impala ignore the data causing any disruption to existing data files. Changing the type of a column works if existing data values can be safely converted to the new type. The type conversion rules depend on the file format of the underlying table. For example, in a text table, the same value can be interpreted as a STRING or a numeric value, while in a binary format such as Parquet, the rules are stricter and type conversions only work between certain sizes of integers."""

servers.foreach(consistentHash.addNode)

servers.foreach(map += _ -> List[String]())

text.split(" ").foreach(s => {
val server = consistentHash.getNode(s)
map(server) = s :: map(server)
})

println(map mkString "\n")
}

def main(args: Array[String]): Unit = {
require(args.length > 0)
consistentHashTest(args(0).toInt)
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: