Cassandra 源码解析 3: 分布式hashtable(DHT) 和 Locator
2010-06-30 18:47
489 查看
原理
分布式的存储,使用多台主机作为存储机器。存放/读取[key, value]时,由key选择目标机器,即使用key值将存储的对象分散在不同的机器上. 散列最简单的方法是目标主机数取模, 假如有n台机器,依次编号(0, 1, 2, ..., n - 1),给定key值,生成一个int hash value,用hash value%n,根据模值找到对应的机器. 这种取模hash的问题在于,两个节点时%2, 三个节点时%3,大部分key的hash值随着节点的数目变化而变化.cassandra DHT使用的是consistent hash
目的:在增加节点或者减少节点时,避免因为rehash而引起的server 震荡。
实现:将hash value空间固定并形成环形,0~2^32,每个node(主机)生成一个hash值,将node映射到hash空间。给定的key,计算其hash值,沿着hash顺时针找到第一个(或者N个,如果需要N个备份)node即为目标主机(如下图,借图一幅)
添加节点:很显然,添加一个节点,hash空间的一小部分被node重新分割,其他区间不受影响,如下图(再借图一幅),只有node5和node2之间的区间从node2 rehash到node5
cassandra 实现
节点的hash值生成
如下所示,在server启动时,将node的Token(注,token即为上文所讲的hash值,可以是Integer类型的Token,也可以是String类型,只要可比较大小进行排序即可作为token)放入全局token表中(TokenMetaData,全部token表存储集群中所有节点对应的token值)。node的token首先检查“LocationInfo” table(column family),如果存在则取出,不存在则随机生成一个新的token并存储。这样每个node的token仅仅生成一次.//StorageService.initServer SystemTable.setBootstrapped(true); Token token = storageMetadata_.getToken(); tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress()); //SystemTable.initMetadata ColumnFamily cf = table.getColumnFamilyStore(STATUS_CF).getColumnFamily(filter); IPartitioner p = StorageService.getPartitioner(); if (cf == null) { Token token; String initialToken = DatabaseDescriptor.getInitialToken(); if (initialToken == null) token = p.getRandomToken(); else token = p.getTokenFactory().fromString(initialToken); int generation = (int) (System.currentTimeMillis() / 1000); RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY); cf = ColumnFamily.create(Table.SYSTEM_TABLE, SystemTable.STATUS_CF); cf.addColumn(new Column(TOKEN, p.getTokenFactory().toByteArray(token))); cf.addColumn(new Column(GENERATION, FBUtilities.toByteArray(generation))); cf.addColumn(new Column(CLUSTERNAME, DatabaseDescriptor.getClusterName().getBytes())); rm.add(cf); rm.apply(); metadata = new StorageMetadata(token, generation, DatabaseDescriptor.getClusterName().getBytes()); return metadata; }
随机Token的生成取决于Partition的方法,默认的Partition是RandomPartitioner(注,随机生成很难保证节点均匀切分hash空间,如果有四个节点,均匀分割,则每个节点分配的hash值为(0/2^32, 2^30, 2 * 2^30, 3 * 2^30),其使用GUID来生成一个BigInteger的Token
GuidGenerator
"s_id(host):System.currentTimeMillis:Random.nextLong"
->
md5.hash
->
hex string
->
GUID Standard Format
->
md5.hash
->
BigInteger
->
abs
//RandomPartitioner.getRandomToken String guid = GuidGenerator.guid(); BigInteger token = FBUtilities.hash(guid); if ( token.signum() == -1 ) token = token.multiply(BigInteger.valueOf(-1L)); return new BigIntegerToken(token); //GuidGenerator public static byte[] guidAsBytes() { StringBuilder sbValueBeforeMD5 = new StringBuilder(); long time = System.currentTimeMillis(); long rand = 0; rand = myRand.nextLong(); sbValueBeforeMD5.append(s_id) .append(":") .append(Long.toString(time)) .append(":") .append(Long.toString(rand)); String valueBeforeMD5 = sbValueBeforeMD5.toString(); return md5.digest(valueBeforeMD5.getBytes()); }
TokenMetadata中维护了全局(所有节点)的token,当节点leave或者add时,更新这个全局表(下一篇中另作分析)
给定key值,寻找目标主机
很显然,读写操作都首先要确定目标Server,这里给出weak读(从一个主机读到即可)的Sequence Diagram(再借图一幅)不见上图,简化版:
CassandraSrever.get_slice
-> StorageProxy.readProtocol
-> StorageService.getNaturalEndpoints
-> .weakReadLocal
-> .weakReadRemote
-> StorageService.findSuitableEndPoint
getNaturalEndpoints
寻找natural end points(目标主机hosts,N = 数据备份数目)分两步由Partition生成key的token, RandomPartitioner用MD5 hash生成
由复制策略来找到目标主机, RackUnawareStrategy使用前文所述的consistent hash方法,将所有node的token排序,然后二分查找给定token在序列中的位置,从该位置起,序列中N个token对应的host即为目标主机。(其他的复制策略主要考虑目标主机的物理位置,try to find the host in different rack or data center)
//StorageService getNaturalEndpoints(command.table, command.key); { return getReplicationStrategy(table).getNaturalEndpoints(partitioner_.getToken(key), table); } //RandomPartitioner public BigIntegerToken getToken(String key) { if (key.isEmpty()) return MINIMUM; return new BigIntegerToken(FBUtilities.hash(key)); } //RackUnawareStrategy.getNaturalEndpoints int replicas = DatabaseDescriptor.getReplicationFactor(table); List<Token> tokens = metadata.sortedTokens(); ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(replicas); if (tokens.isEmpty()) return endpoints; // Add the token at the index by default Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token); while (endpoints.size() < replicas && iter.hasNext()) { endpoints.add(metadata.getEndPoint(iter.next())); } return endpoints;
findSuitableEndPoint
返回目标主机中(naturalEndPoint)最近的活着的节点//StorageService public InetAddress findSuitableEndPoint(String table, String key) throws IOException, UnavailableException { List<InetAddress> endpoints = getNaturalEndpoints(table, key); DatabaseDescriptor.getEndPointSnitch(table).sortByProximity(FBUtilities.getLocalAddress(), endpoints); for (InetAddress endpoint : endpoints) { if (FailureDetector.instance.isAlive(endpoint)) return endpoint; } throw new UnavailableException(); // no nodes that could contain key are alive }
Java hit
Google collection (另作介绍)BiMap<Token, InetAddress> tokenToEndPointMap (Token 和 host 为一对一映射)
AbstractIterator, 从(0, n-1)中的某个位置开始,依次遍历所有的数,n-1的下个数是0(ring),回到位置时(全部遍历一次)结束
return new AbstractIterator<Token>() { int j = startIndex; protected Token computeNext() { if (j < 0) return endOfData(); try { return (Token) ring.get(j); } finally { j = (j + 1) % ring.size(); if (j == startIndex) j = -1; } } };
判断两台主机的物理位置
public boolean isOnSameRack(InetAddress host, InetAddress host2) throws UnknownHostException { /* * Look at the IP Address of the two hosts. Compare * the 3rd octet. If they are the same then the hosts * are in the same rack else different racks. */ byte[] ip = host.getAddress(); byte[] ip2 = host2.getAddress(); return ip[2] == ip2[2]; } public boolean isInSameDataCenter(InetAddress host, InetAddress host2) throws UnknownHostException { /* * Look at the IP Address of the two hosts. Compare * the 2nd octet. If they are the same then the hosts * are in the same datacenter else different datacenter. */ byte[] ip = host.getAddress(); byte[] ip2 = host2.getAddress(); return ip[1] == ip2[1]; }
寻找接近给定主机的hosts
public List<InetAddress> sortByProximity(final InetAddress address, List<InetAddress> addresses) { Collections.sort(addresses, new Comparator<InetAddress>() { public int compare(InetAddress a1, InetAddress a2) { try { if (address.equals(a1) && !address.equals(a2)) return -1; if (address.equals(a2) && !address.equals(a1)) return 1; if (isOnSameRack(address, a1) && !isOnSameRack(address, a2)) return -1; if (isOnSameRack(address, a2) && !isOnSameRack(address, a1)) return 1; if (isInSameDataCenter(address, a1) && !isInSameDataCenter(address, a2)) return -1; if (isInSameDataCenter(address, a2) && !isInSameDataCenter(address, a1)) return 1; return 0; } catch (UnknownHostException e) { throw new RuntimeException(e); } } }); return addresses; }
关于环形(ring)中两个区间相交的判断
public Set<Range> intersectionWith(Range that) { if (this.contains(that)) return rangeSet(that); if (that.contains(this)) return rangeSet(this); boolean thiswraps = isWrapAround(left, right); boolean thatwraps = isWrapAround(that.left, that.right); if (!thiswraps && !thatwraps) { // neither wraps. the straightforward case. if (!(left.compareTo(that.right) < 0 && that.left.compareTo(right) < 0)) return Collections.emptySet(); return rangeSet(new Range((Token)ObjectUtils.max(this.left, that.left), (Token)ObjectUtils.min(this.right, that.right))); } if (thiswraps && thatwraps) { // if the starts are the same, one contains the other, which we have already ruled out. assert !this.left.equals(that.left); // two wrapping ranges always intersect. // since we have already determined that neither this nor that contains the other, we have 2 cases, // and mirror images of those case. // (1) both of that's (1, 2] endpoints lie in this's (A, B] right segment: // ---------B--------A--2---1------> // (2) only that's start endpoint lies in this's right segment: // ---------B----1---A-------2------> // or, we have the same cases on the left segement, which we can handle by swapping this and that. return this.left.compareTo(that.left) < 0 ? intersectionBothWrapping(this, that) : intersectionBothWrapping(that, this); } if (thiswraps && !thatwraps) return intersectionOneWrapping(this, that); assert (!thiswraps && thatwraps); return intersectionOneWrapping(that, this); } private static Set<Range> intersectionBothWrapping(Range first, Range that) { Set<Range> intersection = new HashSet<Range>(2); if (that.right.compareTo(first.left) > 0) intersection.add(new Range(first.left, that.right)); intersection.add(new Range(that.left, first.right)); return Collections.unmodifiableSet(intersection); } private static Set<Range> intersectionOneWrapping(Range wrapping, Range other) { Set<Range> intersection = new HashSet<Range>(2); if (other.contains(wrapping.right)) intersection.add(new Range(other.left, wrapping.right)); // need the extra compareto here because ranges are asymmetrical; wrapping.left _is not_ contained by the wrapping range if (other.contains(wrapping.left) && wrapping.left.compareTo(other.right) < 0) intersection.add(new Range(wrapping.left, other.right)); return Collections.unmodifiableSet(intersection); }
相关文章推荐
- MIT 2012 分布式课程基础源码解析-底层通讯实现
- Java 集合系列11之 Hashtable详细介绍(源码解析)和使用示例
- 分布式事务 TCC-Transaction 源码解析 —— 事务存储器
- Java 集合系列11之 Hashtable详细介绍(源码解析)和使用示例
- 分布式事务 TCC-Transaction 源码解析 —— 调试环境搭建
- 分布式事务 TCC-Transaction 源码解析 —— 调试环境搭建
- Cassandra 源码解析 5: MerkleTree
- 分布式消息队列 RocketMQ源码解析:Filtersrv
- 分布式消息队列 RocketMQ源码解析:事务消息
- 分布式消息队列 RocketMQ源码解析:事务消息
- jdk1.8的Hashtable源码解析
- 分布式事务 TCC-Transaction 源码解析 —— 事务存储器
- Java 集合Hashtable源码深入解析
- 分布式事务 TCC-Transaction 源码解析 —— 调试环境搭建
- 分布式消息队列 RocketMQ源码解析:Filtersrv
- 分布式消息队列 RocketMQ源码解析:Filtersrv
- 分布式消息队列 RocketMQ源码解析:事务消息
- 分布式消息队列 RocketMQ源码解析:事务消息
- [置顶] Java集合之Hashtable源码解析
- 分布式消息队列RocketMQ源码分析之1 -- Topic路由数据结构解析 -- topicRoute与topicPublishInfo与queueId