您的位置:首页 > 编程语言

cassandra源代码总结

2014-11-01 15:25 78 查看
1. Org.apache.cassandra.dht包

   a.接口RingPosition   

     public Token getToken();

     public
boolean
isMinimum(IPartitioner
partitioner);

     这个类是ring环的一个token位置,类中有两个方法getToken()返回token值,另外一个方法isMinimum()用来判断token是否是最小的token值,这个接口经常被继承

   b.类Range

      这个类描述的hash环的一段区间,包括左端点和右端点,类中封装了很多关于环的区间的API,包括判断区间是否相交、区间是否包含、区间的差集、区间是否相等等,此类主要用于关于hash环的区间一些操作。至于说这些API什么时候被调用,根据实际情况主要看API的作用而定。

      public
boolean
contains(Range<T> that) ;//用于判断两个range是否相等;

      public
boolean
intersects(Range<T> that)  ;//判断range是否相交  

      public
boolean
equals(Object o);//判断是否相等

   c.抽象类Token 继承RingPosition

      这个类是hash环上的一个token,token的值决定了节点的位置(节点是按照token的值按照从小到大的顺序排列,)也决定了副本的放置位置(副本的第一份按照分区器partitioner放置在比他的token值大的第一个节点上,其他的按照副本放置策略依次放置)

Token的概念非常重要,token也是为副本和节点的唯一区分,Token类调用的很多。因为Token本身也是抽象类,所以被很多类继承,而且被什么类继承,那么token的值就是什么类型。如果被LongToken继承,那么token的值是Long型的,如果被StringToken继承,那么token就是string类型的

 Compare()比较函数,equals()判断token是否相等,hashcode()生成token的hash值;

TokenFactory是key值和token相互转化的类,里面有封装有很多的方法
LongToken继承了Token,因为Token是泛型的,所以类型Long就替代了T,则Token的值为Long,其他的道理一样。
                     
IPartitoner是分区器的接口,然后抽象类AbstractPartitioner继承IParttioner,RandomPartitioner、Murmur3Partitioner、LocalPartitioner继承抽象类AbstractPartitioner,IPartitioner封装了关于token的API,有获取中间token函数midpoint()函数,获取最小的Token,以及token的生成函数getToken(ByteBuffer key),这是最重要的方法,里面就是token的生成算法,cassandra默认的分区器就是Murmur3Partioner,生成token的hash算法就是MD5hash值, 
MurmurHash.hash3_x64_128();
Org.apache.cassandra.gms
 
这个包主要是关于cassandra的gossip协议节点之间通信过程,从而达到数据一致。主要流程是A节点向任意一个在线的节点(假如记做B)发送GossipDigestSYN(SYN消息里面封装了A节点存储的其他的节点的信息)消息,EndpointState就是封装节点的信息,类中包含了HeartBeatState心跳信息,还有应用数据信息applicationState等等,B接收到A发送的SYN信息之后,首先更新本地的心跳队列,当满足远端generation>本地generation或者当远端generation=本地generation&&远端maxversion>本地maxversion,更新本地的心跳队列。还要根据比较摘要数据和本地数据,生成要返回的摘要数据和状态数据,然后封装成GossipDigestACK消息发送至A,A接收到消息之后,首先更新自己的心跳队列,然后更新本地的状态数据,生成要返回的状态数据,然后封装成
GossipDigestACK2消息,然后发送给B,B接收到A发送的消息之后,更新自己的心跳队列然后更新自己本地的数据。下边画图解释工作原理。
      
 
 
 
代码部分解释:
EndpointState类包括HeartBeatState和ApplicationState两个类的对象,存储了心跳的信息和应用数据的信息,
 
  
public void run()
{
try
{
//wait on messaging service to start listening
MessagingService.instance().waitUntilListening();

/* Update the local heartbeat counter. */
endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat();
if (logger.isTraceEnabled())
logger.trace("My heartbeat is now " + endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().getHeartBeatVersion());
final List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
Gossiper.instance.makeRandomGossipDigest(gDigests);

if (gDigests.size() > 0)
{
GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
DatabaseDescriptor.getPartitionerName(),
gDigests);
MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,
digestSynMessage,
GossipDigestSyn.serializer);
/* Gossip to some random live member */
boolean gossipedToSeed = doGossipToLiveMember(message);

/* Gossip to some unreachable member with some probability to check if he is back up */
doGossipToUnreachableMember(message);

/* Gossip to a seed if we did not do so above, or we have seen less nodes
than there are seeds.  This prevents partitions where each group of nodes
is only gossiping to a subset of the seeds.

The most straightforward check would be to check that all the seeds have been
verified either as live or unreachable.  To avoid that computation each round,
we reason that:

either all the live nodes are seeds, in which case non-seeds that come online
will introduce themselves to a member of the ring by definition,

or there is at least one non-seed node in the list, in which case eventually
someone will gossip to it, and then do a gossip to a random seed from the
gossipedToSeed check.

See CASSANDRA-150 for more exposition. */
if (!gossipedToSeed || liveEndpoints.size() < seeds.size())
doGossipToSeed(message);

doStatusCheck();
}
}
catch (Exception e)
{
logger.error("Gossip error", e);
}
}
}


从上面的代码可以看出Gossiper.instance.makeRandomGossipDigest(gDigests);用来生成GossipDigest,然后调用doGossipToLiveMember(message)向在线的节点发送message消息,然后调用doGossipToUnreachableMember(message)方法向下线的节点发送message消息,满足if
(!gossipedToSeed ||
liveEndpoints.size() < seeds.size())  调用doGossipToSeed()方法向种子节点发送消息。

doGossipToLiveMember方法调用sendGossip()方法,

  private boolean doGossipToLiveMember(MessageOut<GossipDigestSyn> message)
{
int size = liveEndpoints.size();
if (size == 0)
return false;
return sendGossip(message, liveEndpoints);
}
private boolean sendGossip(MessageOut<GossipDigestSyn> message, Set<InetAddress> epSet)
{
List<InetAddress> liveEndpoints = ImmutableList.copyOf(epSet);

int size = liveEndpoints.size();
if (size < 1)
return false;
/* Generate a random number from 0 -> size */
int index = (size == 1) ? 0 : random.nextInt(size);
InetAddress to = liveEndpoints.get(index);
if (logger.isTraceEnabled())
logger.trace("Sending a GossipDigestSyn to {} ...", to);
MessagingService.instance().sendOneWay(message, to);
return seeds.contains(to);
}


SendGossip()方法就是从liveEndpoints随机选取一个节点然后发送消息,而对SYN消息的处理在GossipDigestSynVerbHandler类中进行,处理逻辑如下所示:

doSort(gDigestList);

List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
logger.trace("sending {} digests and {} deltas", deltaGossipDigestList.size(), deltaEpStateMap.size());
MessageOut<GossipDigestAck> gDigestAckMessage = new MessageOut<GossipDigestAck>(MessagingService.Verb.GOSSIP_DIGEST_ACK,
new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap),
GossipDigestAck.serializer);
if (logger.isTraceEnabled())
logger.trace("Sending a GossipDigestAckMessage to {}", from);
MessagingService.instance().sendOneWay(gDigestAckMessage, from);

调用doSort()方法对gDigestList进行排序,然后MessageingService.instance.sendOneWay();

发送消息SYN消息,对SYN消息处理是在GossipDigestAckVerbHandler类,主要处理的代码如下所示:
GossipDigestAck gDigestAckMessage = message.payload;
List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size());

if (epStateMap.size() > 0)
{
/* Notify the Failure Detector */
Gossiper.instance.notifyFailureDetector(epStateMap);
Gossiper.instance.applyStateLocally(epStateMap);
}

if (Gossiper.instance.isInShadowRound())
{
if (logger.isDebugEnabled())
logger.debug("Finishing shadow round with {}", from);
Gossiper.instance.finishShadowRound();
return; // don't bother doing anything else, we have what we came for
}

/* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
for (GossipDigest gDigest : gDigestList)
{
InetAddress addr = gDigest.getEndpoint();
EndpointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
if (localEpStatePtr != null)
deltaEpStateMap.put(addr, localEpStatePtr);
}

MessageOut<GossipDigestAck2> gDigestAck2Message = new MessageOut<GossipDigestAck2>(MessagingService.Verb.GOSSIP_DIGEST_ACK2,
new GossipDigestAck2(deltaEpStateMap),
GossipDigestAck2.serializer);
if (logger.isTraceEnabled())
logger.trace("Sending a GossipDigestAck2Message to {}", from);
MessagingService.instance().sendOneWay(gDigestAck2Message, from);

 
这部分的代码主要是deltaEpStateMap假如节点的新状态信息,然后封装成GossipDigestAck2消息,然后发送出去。然后对ACK消息的处理在GossipDigestAck2VerbHandler类中,

 
 上面的代码就是对于ACK消息的处理,
       Gossiper.instance.applyStateLocally(remoteEpStateMap);他的作用就是更新节点的状态信息。
 
Org.apache.cassandra.locator

IEndpointSnitch接口中封装有获取机架、获取数据中心、按照给定节点距离排序等等方法。
副本的继承结构:

三个副本策略大同小异,就一个SimpleStrategy为例: 一个副本的key可以由partitioner求出token的值,然后根据token和副本放置策略求出放置副本的所有的节点,调用getCachedEndpoints()方法是先从缓存中查找,验证缓存中的版本,如果更新的话就从缓存中获取,否则的话就调用getNaturalEndpoints()方法获取所有的节点,方法调用的时序图如下所示:

从上面的代码可以看出,SimpleStrategy处理由token找到存储的节点的方法是:首先tokenmetadata从给定的token开始,然后按照从大到小的顺序查找节点,如果发现节点不在endpoints里面就添加进去,一直到节点的个数等于副本因子。这就是跟SimpleStrategy存储副本的策略完全一致,即第一份副本放置在比key的token值大的第一个节点上,然后其他的按照顺时针的方向依次放置在节点上。
 
NetworkTopologyStrategy处理的方法跟SimpleStrategy一致,就不多讲啦!
 
4 org.apache.cassandra.service
 
CassandraDaemon类 负责cassandra启动的类,cassandra启动会做很多事情,比如说初始化各种配置,还有加载keyspace,还有就是compaction压缩,以及是否需要bootstrap。下面是cassandra启动时主要的序列图。
 

 
上面的启动过程重点在于节点首次加入集群时需要bootstrap(非种子节点),bootstrap的过程如下:首先判断节点是否需要bootstrap,判断的条件有三个,一是配置项auto_bootstrap是否为真,第二判断bootstrap是否完成。第三判断节点是否是种子节点,如果配置项auto_strap为真并且bootstrap过程并没有结束,而且节点不是种子节点的话就要bootstrap,获取节点的token值,方法是调用方法bootstrapTokens()获取;首先判断initialtokens的值是否大于0,囚大于0的话就把initialTokens的值转化成Token类型加入到tokens并返回。否则判断numtokens的值,大于1时调用getRandomTokens方法获取分区器,利用分区随机生成token值,如果该token'的值在tokenmetadata不存在则添加tokens并返回。主要代码如下所示:

Org.apache.casandra.thrift
 
CassandraServer类,cassandra的响应客户端的thrift服务器,继承Cassandra.Iface接口,跟客户端的一致,客户端每发送一个请求,服务器端都会用相同的方法处理,因为方法处理的流程基本一样所以就举一个例子:get(ByteBufferkey, ColumnPathcolumn_path,
ConsistencyLevel consistency_level)方法获取Column,具体代码如下,

 
 
客户端查询数据调用get()方法,get方法调用send_get()方法向服务器发送请求,send_get()方法里创建get_args对象,然后设置参数,然后调用sendBase方法通过各种协议发送到服务器端,服务器端调用对应的get方法()处理请求,
 

 
服务器端处理完之后会返回ColumnOrSuperColumn对象给客户端,客户端会调用receiveBase方法接收此对象。
 
Org.apache.cassandra.config
 
CFMetaData类是关于ColumnFamily的信息,Config类是关于cassandra的配置信息,这些信息可以从cassandra的配置文件cassandra.yaml中获得,DatabaseDescriptor可以获取系统的信息,这些信息是Config类中的信息在经过一些条件验证得到的。
Schema类存的关于cassandra的keyspace等等信息,修改keyspace必须要修改schema信息,
YamlConfigurationLoader类加载配置文件cassandra.yaml信息,
 

上面的代码就是加载cassandra.yaml文件信息并存到对应的属性上。
 
 
Org.apache.cassandra.cli
 
CliMain类主要是cassandra编译生成cassandra的客户端。
CliSessionState存储的是cassandra连接状态的信息,包括服务器的名字,thrift的客户端,用户名和密码等信息。

Org.apache.cassandra.auth
 
  cassandra的授权机制,cassandra的各种权限在这里设置。通过对用户赋予某种权限来限制用户的行为,Permission类还有用户具有的的权限,create,modify,drop,select,alter等等。
 
Org.apache.cassandra.cache
 
cassandra的缓存设置,这个类没有细看。
 
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  cassandra nosql