您的位置:首页 > 其它

Cassandra 源码解析 3: 分布式hashtable(DHT) 和 Locator

2010-06-30 18:47 489 查看

原理

分布式的存储,使用多台主机作为存储机器。存放/读取[key, value]时,由key选择目标机器,即使用key值将存储的对象分散在不同的机器上. 散列最简单的方法是目标主机数取模, 假如有n台机器,依次编号(0, 1, 2, ..., n - 1),给定key值,生成一个int hash value,用hash value%n,根据模值找到对应的机器. 这种取模hash的问题在于,两个节点时%2, 三个节点时%3,大部分key的hash值随着节点的数目变化而变化.

cassandra DHT使用的是consistent hash

目的:在增加节点或者减少节点时,避免因为rehash而引起的server 震荡。

实现:将hash value空间固定并形成环形,0~2^32,每个node(主机)生成一个hash值,将node映射到hash空间。给定的key,计算其hash值,沿着hash顺时针找到第一个(或者N个,如果需要N个备份)node即为目标主机(如下图,借图一幅)



添加节点:很显然,添加一个节点,hash空间的一小部分被node重新分割,其他区间不受影响,如下图(再借图一幅),只有node5和node2之间的区间从node2 rehash到node5



cassandra 实现

节点的hash值生成

如下所示,在server启动时,将node的Token(注,token即为上文所讲的hash值,可以是Integer类型的Token,也可以是String类型,只要可比较大小进行排序即可作为token)放入全局token表中(TokenMetaData,全部token表存储集群中所有节点对应的token值)。node的token首先检查“LocationInfo” table(column family),如果存在则取出,不存在则随机生成一个新的token并存储。这样每个node的token仅仅生成一次.

//StorageService.initServer
SystemTable.setBootstrapped(true);
Token token = storageMetadata_.getToken();
tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress());
//SystemTable.initMetadata
ColumnFamily cf = table.getColumnFamilyStore(STATUS_CF).getColumnFamily(filter);
IPartitioner p = StorageService.getPartitioner();
if (cf == null)
{
Token token;
String initialToken = DatabaseDescriptor.getInitialToken();
if (initialToken == null)
token = p.getRandomToken();
else
token = p.getTokenFactory().fromString(initialToken);
int generation = (int) (System.currentTimeMillis() / 1000);
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
cf = ColumnFamily.create(Table.SYSTEM_TABLE, SystemTable.STATUS_CF);
cf.addColumn(new Column(TOKEN, p.getTokenFactory().toByteArray(token)));
cf.addColumn(new Column(GENERATION, FBUtilities.toByteArray(generation)));
cf.addColumn(new Column(CLUSTERNAME, DatabaseDescriptor.getClusterName().getBytes()));
rm.add(cf);
rm.apply();
metadata = new StorageMetadata(token, generation, DatabaseDescriptor.getClusterName().getBytes());
return metadata;
}


随机Token的生成取决于Partition的方法,默认的Partition是RandomPartitioner(注,随机生成很难保证节点均匀切分hash空间,如果有四个节点,均匀分割,则每个节点分配的hash值为(0/2^32, 2^30, 2 * 2^30, 3 * 2^30),其使用GUID来生成一个BigInteger的Token

GuidGenerator

"s_id(host):System.currentTimeMillis:Random.nextLong"

->

md5.hash

->

hex string

->

GUID Standard Format

->

md5.hash

->

BigInteger

->

abs

//RandomPartitioner.getRandomToken
String guid = GuidGenerator.guid();
BigInteger token = FBUtilities.hash(guid);
if ( token.signum() == -1 )
token = token.multiply(BigInteger.valueOf(-1L));
return new BigIntegerToken(token);
//GuidGenerator
public static byte[] guidAsBytes()
{
StringBuilder sbValueBeforeMD5 = new StringBuilder();
long time = System.currentTimeMillis();
long rand = 0;
rand = myRand.nextLong();
sbValueBeforeMD5.append(s_id)
.append(":")
.append(Long.toString(time))
.append(":")
.append(Long.toString(rand));
String valueBeforeMD5 = sbValueBeforeMD5.toString();
return md5.digest(valueBeforeMD5.getBytes());
}


TokenMetadata中维护了全局(所有节点)的token,当节点leave或者add时,更新这个全局表(下一篇中另作分析)

给定key值,寻找目标主机

很显然,读写操作都首先要确定目标Server,这里给出weak读(从一个主机读到即可)的Sequence Diagram(再借图一幅)



不见上图,简化版:

CassandraSrever.get_slice

-> StorageProxy.readProtocol

-> StorageService.getNaturalEndpoints

-> .weakReadLocal

-> .weakReadRemote

-> StorageService.findSuitableEndPoint

getNaturalEndpoints

寻找natural end points(目标主机hosts,N = 数据备份数目)分两步

由Partition生成key的token, RandomPartitioner用MD5 hash生成

由复制策略来找到目标主机, RackUnawareStrategy使用前文所述的consistent hash方法,将所有node的token排序,然后二分查找给定token在序列中的位置,从该位置起,序列中N个token对应的host即为目标主机。(其他的复制策略主要考虑目标主机的物理位置,try to find the host in different rack or data center)

//StorageService
getNaturalEndpoints(command.table, command.key);
{
return getReplicationStrategy(table).getNaturalEndpoints(partitioner_.getToken(key), table);
}
//RandomPartitioner
public BigIntegerToken getToken(String key)
{
if (key.isEmpty())
return MINIMUM;
return new BigIntegerToken(FBUtilities.hash(key));
}
//RackUnawareStrategy.getNaturalEndpoints
int replicas = DatabaseDescriptor.getReplicationFactor(table);
List<Token> tokens = metadata.sortedTokens();
ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(replicas);
if (tokens.isEmpty())
return endpoints;
// Add the token at the index by default
Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token);
while (endpoints.size() < replicas && iter.hasNext())
{
endpoints.add(metadata.getEndPoint(iter.next()));
}
return endpoints;


findSuitableEndPoint

返回目标主机中(naturalEndPoint)最近的活着的节点

//StorageService
public InetAddress findSuitableEndPoint(String table, String key) throws IOException, UnavailableException
{
List<InetAddress> endpoints = getNaturalEndpoints(table, key);
DatabaseDescriptor.getEndPointSnitch(table).sortByProximity(FBUtilities.getLocalAddress(), endpoints);
for (InetAddress endpoint : endpoints)
{
if (FailureDetector.instance.isAlive(endpoint))
return endpoint;
}
throw new UnavailableException(); // no nodes that could contain key are alive
}


Java hit

Google collection (另作介绍)

BiMap<Token, InetAddress> tokenToEndPointMap (Token 和 host 为一对一映射)

AbstractIterator, 从(0, n-1)中的某个位置开始,依次遍历所有的数,n-1的下个数是0(ring),回到位置时(全部遍历一次)结束

return new AbstractIterator<Token>()
{
int j = startIndex;
protected Token computeNext()
{
if (j < 0)
return endOfData();
try
{
return (Token) ring.get(j);
}
finally
{
j = (j + 1) % ring.size();
if (j == startIndex)
j = -1;
}
}
};


判断两台主机的物理位置

public boolean isOnSameRack(InetAddress host, InetAddress host2) throws UnknownHostException
{
/*
* Look at the IP Address of the two hosts. Compare
* the 3rd octet. If they are the same then the hosts
* are in the same rack else different racks.
*/
byte[] ip = host.getAddress();
byte[] ip2 = host2.getAddress();
return ip[2] == ip2[2];
}
public boolean isInSameDataCenter(InetAddress host, InetAddress host2) throws UnknownHostException
{
/*
* Look at the IP Address of the two hosts. Compare
* the 2nd octet. If they are the same then the hosts
* are in the same datacenter else different datacenter.
*/
byte[] ip = host.getAddress();
byte[] ip2 = host2.getAddress();
return ip[1] == ip2[1];
}


寻找接近给定主机的hosts

public List<InetAddress> sortByProximity(final InetAddress address, List<InetAddress> addresses)
{
Collections.sort(addresses, new Comparator<InetAddress>()
{
public int compare(InetAddress a1, InetAddress a2)
{
try
{
if (address.equals(a1) && !address.equals(a2))
return -1;
if (address.equals(a2) && !address.equals(a1))
return 1;
if (isOnSameRack(address, a1) && !isOnSameRack(address, a2))
return -1;
if (isOnSameRack(address, a2) && !isOnSameRack(address, a1))
return 1;
if (isInSameDataCenter(address, a1) && !isInSameDataCenter(address, a2))
return -1;
if (isInSameDataCenter(address, a2) && !isInSameDataCenter(address, a1))
return 1;
return 0;
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
}
});
return addresses;
}


关于环形(ring)中两个区间相交的判断

public Set<Range> intersectionWith(Range that)
{
if (this.contains(that))
return rangeSet(that);
if (that.contains(this))
return rangeSet(this);
boolean thiswraps = isWrapAround(left, right);
boolean thatwraps = isWrapAround(that.left, that.right);
if (!thiswraps && !thatwraps)
{
// neither wraps.  the straightforward case.
if (!(left.compareTo(that.right) < 0 && that.left.compareTo(right) < 0))
return Collections.emptySet();
return rangeSet(new Range((Token)ObjectUtils.max(this.left, that.left),
(Token)ObjectUtils.min(this.right, that.right)));
}
if (thiswraps && thatwraps)
{
// if the starts are the same, one contains the other, which we have already ruled out.
assert !this.left.equals(that.left);
// two wrapping ranges always intersect.
// since we have already determined that neither this nor that contains the other, we have 2 cases,
// and mirror images of those case.
// (1) both of that's (1, 2] endpoints lie in this's (A, B] right segment:
//  ---------B--------A--2---1------>
// (2) only that's start endpoint lies in this's right segment:
//  ---------B----1---A-------2------>
// or, we have the same cases on the left segement, which we can handle by swapping this and that.
return this.left.compareTo(that.left) < 0
? intersectionBothWrapping(this, that)
: intersectionBothWrapping(that, this);
}
if (thiswraps && !thatwraps)
return intersectionOneWrapping(this, that);
assert (!thiswraps && thatwraps);
return intersectionOneWrapping(that, this);
}
private static Set<Range> intersectionBothWrapping(Range first, Range that)
{
Set<Range> intersection = new HashSet<Range>(2);
if (that.right.compareTo(first.left) > 0)
intersection.add(new Range(first.left, that.right));
intersection.add(new Range(that.left, first.right));
return Collections.unmodifiableSet(intersection);
}
private static Set<Range> intersectionOneWrapping(Range wrapping, Range other)
{
Set<Range> intersection = new HashSet<Range>(2);
if (other.contains(wrapping.right))
intersection.add(new Range(other.left, wrapping.right));
// need the extra compareto here because ranges are asymmetrical; wrapping.left _is not_ contained by the wrapping range
if (other.contains(wrapping.left) && wrapping.left.compareTo(other.right) < 0)
intersection.add(new Range(wrapping.left, other.right));
return Collections.unmodifiableSet(intersection);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: