您的位置:首页 > 其它

一致性hash在分布式系统中的应用

2017-06-12 23:04 946 查看

场景

如果要设计一套KV存储的系统,用户PUT一个key和value,存储到系统中,并且提供用户根据key来GET对应的value。要求随着用户规模变大,系统是可以水平扩展的,主要要解决以下几个问题。

系统是一个集群,包含很多节点,如何解决用户数据的存储问题?保证用户的数据尽可能平均分散到各个节点上。

如果用户量增长,需要对集群进行扩容,扩容完成后如何解决数据重新分布?保证不会出现热点数据节点。

方案一:取模hash

要设计上面的系统,最简单的方案就是取模hash。基本的原理就是:假设集群一共有N台机器提供服务,对于用户的请求编号,比如编号M,那么就把这个请求通过取模发送到指定机器。

机器序号=M%N

举个例子,比如有下面这些机器

0.192.168.1.1
1.192.168.2.2
2.192.168.3.3
3.192.168.4.4

用户PUT100个请求,此时客户端(可以设计)带上一个编号,分别是1-100,那么

1%4=1<<-->>192.168.2.2
2%4=2<<-->>192.168.3.3
3%4=3<<-->>192.168.4.4
...
100%4=0<<-->>192.168.1.1

这样就可以很简单把用户的请求负载均衡到4台机器上了,解决了第一个问题。可以看看下面代码实现

content="""Consistenthashingisaspecialkindofhashingsuchthatwhenahashtableisresizedandconsistenthashingisused,onlyK/nkeysneedtoberemappedonaverage,whereKisthenumberofkeys,andnisthenumberofslots.Incontrast,inmosttraditionalhashtables,achangeinthenumberofarrayslotscausesnearlyallkeystoberemapped."""

###所有机器列表
servers=[
"192.168.1.1",
"192.168.2.2",
"192.168.3.3",
"192.168.4.4"
]

classNormalHash(object):
"""NormalHash"""
def__init__(self,nodes=None):
ifnodes:
self.nodes=nodes
self.number=len(nodes)

defget_node(self,index):
"""Returnnodebyindex%serversnumber
"""
ifindex<0:
returnNone
returnself.nodes[index%self.number]

defnormal_hash():
"""Normalhashusageexample"""
nh=NormalHash(servers)
words=content.split()

#模拟初始化每天机器的db
database={}
forsinservers:
database[s]=[]

foriinxrange(len(words)):
database[nh.get_node(i)].append(words[i])

printdatabase

上面这部分是客户端的代码,NormalHash其实可以是在服务端实现,客户端每次要PUT或者GET一个key,就调用服务端的sdk,获取对应机器,然后操作。

取模hash情况下扩容机器

取模hash有一个明显的缺点,就是上面提出的第二个问题,如何解决扩容机器后数据分布的问题?继续上面的例子,比如这时候要新增一台机器,机器规模变成

0.192.168.1.1
1.192.168.2.2
2.192.168.3.3
3.192.168.4.44.192.168.5.5

那么问题就来了,如果现在用户要通过GET请求数据,同样还是1-100的请求编号,这时候取模就变成

i%5

1%5=1<<-->>192.168.2.2
2%5=2<<-->>192.168.3.3
3%5=3<<-->>192.168.4.4
4%5=4<<-->>192.168.5.5->>这里开始就变化了
...

很显然,对于新的PUT操作不会有影响,但是对于用户老的数据GET请求,数据就不一致了,这时候必须要进行移数据,可以推断出,这里的数据变更是很大的,在80%左右。

但是,如果扩容的集群是原来的倍数,之前是N台,现在扩容到M*N台,那么数据迁移量是50%。

取模hash总结

取模hash能解决负载均衡问题,而且实现很简单,维护meta信息成本也很小,但是扩容集群的时候,最好是按照整数倍扩容,否则数据迁移成本太高。

我个人觉得,取模hash已经能满足业务比较小的场景了,在机器只有几台或者几十台的时候,完全能够应付了。而且这种方案很简洁,实现起来很容易,很容易理解。

方案二:一致性hash

一致性hash基本实现如下图,这张图最早出现在是memcached分布式实现里。如何理解一致性hash呢?



首先我们设计一个环,假设这个环是由2^32-1个点组成,也就是说[0,2^32)上的任意一个点都能在环上找到。

现在采用一个算法(md5就可以),把我们集群中的服务器以ip地址作为key,然后根据算法得到一个值,这个值映射到环上的一个点,然后还有对应的数据存储区间

IP地址hashvalue(例子)数据范围
192.168.1.1-->>1000-->>(60000,1000](可以看环来理解,和时钟一样)
192.168.2.2-->>8000-->>(1000,8000]
192.168.3.3-->>25000-->>(8000,25000]
192.168.4.4-->>60000-->>(25000,60000]


用户的请求过来后,对key进行hash,也映射到环上的一个点,根据ip地址的数据范围存储到对应的节点上,图上粉红色的点就代表数据映射后的环上位置,然后箭头就是代表存储的节点位置

一致性hash情况下扩容机器

一致性hash在某种程度上是可以解决数据的负载均衡问题的,再来看看扩容的情况,这时候新增加一个节点,图



机器情况变成

IP地址hashvalue(例子)数据范围
192.168.1.1-->>1000-->>(60000,1000](注意:取模后的逻辑大小)
192.168.2.2-->>8000-->>(1000,8000]
192.168.5.5-->>15000-->>(8000,15000](新增的)
192.168.3.3-->>25000-->>(15000,25000]
192.168.4.4-->>60000-->>(25000,60000]

这时候被影响的数据范围仅仅是(8000,15000]的数据,这部分需要做迁移。同样的如果有一台机器宕机,那么受影响的也只是比这台机器对应环上的点大,比下一个节点值小的点。

一致性hash总结

一致性hash能解决热点分布的问题,对于缩容和扩容也能低成本进行。但是一致性hash在小规模集群中,就会有问题,很容易出现数据热点分布不均匀的现象,因为当机器数量比较少的时候,hash出来很有可能各自几点管理的“范围”有大有小。而且一旦规模比较小的情况下,如果数据原本是均匀分布的,这时候新加入一个节点,就会影响数据分布不均匀。

虚拟节点

虚拟节点可以解决一致性hash在节点比较少的情况下的问题,简单而言就是在一个节点实际虚拟出多个节点,对应到环上的值,然后按照顺时针或者逆时针划分区间

下面贴上一致性hash的代码,replicas实现了虚拟节点,当replicas=1的时候,就退化到上面的图,一个节点真实对应到一个环上的点。

#-*-coding:UTF-8-*-

importmd5

content="""Consistenthashingisaspecialkindofhashingsuchthatwhenahashtableisresizedandconsistenthashingisused,onlyK/nkeysneedtoberemappedonaverage,whereKisthenumberofkeys,andnisthenumberofslots.Incontrast,inmosttraditionalhashtables,achangeinthenumberofarrayslotscausesnearlyallkeystoberemapped."""

#所有机器列表
servers=[
"192.168.1.1",
"192.168.2.2",
"192.168.3.3",
"192.168.4.4"
]

classHashRing(object):

def__init__(self,nodes=None,replicas=3):
"""Managesahashring.

`nodes`isalistofobjectsthathaveaproper__str__representation.
`replicas`indicateshowmanyvirtualpointsshouldbeusedpr.node,
replicasarerequiredtoimprovethedistribution.
"""
self.replicas=replicas

self.ring=dict()
self._sorted_keys=[]

ifnodes:
fornodeinnodes:
self.add_node(node)

defadd_node(self,node):
"""Addsa`node`tothehashring(includinganumberofreplicas).
"""
foriinxrange(0,self.replicas):
key=self.gen_key('%s:%s'%(node,i))
self.ring[key]=node
self._sorted_keys.append(key)

self._sorted_keys.sort()

defremove_node(self,node):
"""Removes`node`fromthehashringanditsreplicas.
"""
foriinxrange(0,self.replicas):
key=self.gen_key('%s:%s'%(node,i))
delself.ring[key]
self._sorted_keys.remove(key)

defget_node(self,string_key):
"""Givenastringkeyacorrespondingnodeinthehashringisreturned.

Ifthehashringisempty,`None`isreturned.
"""
returnself.get_node_pos(string_key)[0]

defget_node_pos(self,string_key):
"""Givenastringkeyacorrespondingnodeinthehashringisreturned
alongwithit'spositioninthering.

Ifthehashringisempty,(`None`,`None`)isreturned.
"""
ifnotself.ring:
returnNone,None

key=self.gen_key(string_key)

nodes=self._sorted_keys
foriinxrange(0,len(nodes)):
node=nodes[i]
ifkey<=node:
returnself.ring[node],i

returnself.ring[nodes[0]],0

defget_nodes(self,string_key):
"""Givenastringkeyitreturnsthenodesasageneratorthatcanholdthekey.

Thegeneratorisneverendinganditeratesthroughthering
startingatthecorrectposition.
"""
ifnotself.ring:
yieldNone,None

node,pos=self.get_node_pos(string_key)
forkeyinself._sorted_keys[pos:]:
yieldself.ring[key]

whileTrue:
forkeyinself._sorted_keys:
yieldself.ring[key]

defgen_key(self,key):
"""Givenastringkeyitreturnsalongvalue,
thislongvaluerepresentsaplaceonthehashring.

md5iscurrentlyusedbecauseitmixeswell.
"""
m=md5.new()
m.update(key)
returnlong(m.hexdigest(),16)

defconsistent_hash():

#模拟初始化每天机器的db
database={}
forsinservers:
database[s]=[]

hr=HashRing(servers)

forwinwords.split():
database[hr.get_node(w)].append(w)

printdatabase

consistent_hash()



from:http://www.firefoxbug.com/index.php/archives/2791/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: