您的位置:首页 > 其它

cassandra 学习笔记

2010-04-11 17:10 337 查看
1). cassandra任何一个节点都可以被客户端访问。

2).
对cassandra某个节点的访问是通过调用org.apache.cassandra.service.Cassandra的内部类Client的相
应接口实现的。

3).
2)中的Cassandra这个类包含了很多内部类和一个接口(Iface)。其中的Client和Processor两个内部类都是对Iface的实
现,这保证了他们的内部的所有方法是一一对应的。

4).
当Client这个内部类中的某个方法被调用的时候,该方法会用他内部的send_开头的方法发送消息,并且用recv_开头的方法接收返回的内容,容,
返回的内容可能是我们想要的数据,也可能是个异常的消息,如果是异常的消息,则会在客户端生成一个相应的异常并抛出,

5).
Client端send_和recv_方法同目标节点的交互是分别通过oprot和iprot的实例完成的,这两个实例是负责输入输出的,具体的功能的实
现在libthrift.jar中。

6).
节点和客户端通信的连接是由libthrift.jar中的TThreadPoolServer的实例实现的,这个实例在该节点最初启动的时候被生成,并
且该实例内部还保有一个2)中提到的Processor实例。TThreadPoolServer实例给Processor实例提供了输入输出实例
iprot和oprot,并且通过调用Processor的processprocess(TProtocol iprot, TProtocol
oprot)接口来进一步的向内传递消息。

7).
节点最初启动的初始话过程是在org.apache.cassandra.service.CassandraDaemon的setup()中完成的。

8). 在Processor的processprocess(TProtocol iprot, TProtocol
oprot)会解析iprot中传入的客户端的请求,并首先解析出要调用函数的函数名字,然后通过查询processMap_来决定究竟由那个
ProcessFunction实例来接收处理消息,相应的ProcessFunction实例的process(int seqid,
TProtocol iprot, TProtocol oprot)被激活并开始全权负责消息的处理和反馈。

9). 相应的的ProcessFunction的实例主要负责三件事:i,进一步处理iprot传入的消息
ii,将详细的信息转发给iface的相应方法处理
iii,将得到的反馈通过oprot返回给客户端。这里的iface实例实际上是
org.apache.cassandra.service.CassandraServer的一个实例,在Processor的实例创建的时候(节点启
动的时候)被装入了Processor实例,但是由于ProcessFunction类是Processor的内部类,所以
ProcessFunction的实例也能直接访问。

10)以上可知,最终客户端的信息是交给CassandraServer的相应方法来处理的,而thrift的相关功能只是负责了客户端和节点间的
交互(9160端口),而节点之间的交互并没有使用thrift的资源。

源码中对节点的如下称呼应该是等价的: end point , node , machine , datacenter , host。

cassandra节点的启动main()在类org.apache.cassandra.service.CassandraDaemon中,细节在
setup()中。过程中会start一个CassandraServer的实例peerStorageServer。
peerStorageServer在建立的时候,内部会实例化一个
StorageService实例,在该StorageService实例初始化的过程中,该节点的所有功能服务会被配置激活,这些操作是在
StorageService的默认构造器中完成的。

StorageService的构造器中大致做了如下几件事情:

1)生成一个storageLoadBalancer_s实例负责负载均衡,现在还没有明白原理。

2)生成一个endPointSnitch_实例,这个提供了对两个end_point进行比较的一个途径,基本上是判断两个end_point是不是同
一个ip等

3)启动了MessagingService,并且注册了一些handler实例。MessagingService是负责该end_point与其他
end_point进行通信的。两个节点间通信的内容被封装在一个Message的实例里面。

比如,如果节点A想向节点B获得一定的数据,那么A需要通过自己的MessageService向节点B发送一个Message实例,这个实例里面包含了
如下信息:这个请求的类型(属于什么stage) ,
这个请求要调用的B的哪个handler来处理,以及这个请求的其他具体内容。当节点B接收到节点A发送过来的Message实例后,会将根据这个
Message实例内部指定的handler信息,将该实例转发相应的handler去处理。当然这样做的前提是这个指定的handler已经在B节点注
册了,而这个注册过程就是在StorageService启动的时候完成的。

4)consistencyManager_:还没明白什么意思。

5)StageManager的配置:

这个stage的概念来自于“SEDA“( staged event driven
architecture)中的"S",参考http://www.eecs.harvard.edu/~mdw/papers/seda-
sosp01.pdf。

大致意思好像是可以将一个工作流程分为若干个阶段(stage),然后给各个stage动态的分配线程资源来处理stage内定义的逻辑。stage和
stage之间的通信是通过任务队列完成的,当一个stage的逻辑执行完后,如果需要调用下一个stage来继续执行,那么就往下一个stage保有的
任务队列中写入必要的任务信息。

这样的SEDA结构的好处是:在实际的运行当中各个stage的忙闲程度是不一样的,可以通过将比较闲的stage上的线程资源分配给同期比较忙的
stage来实现效率上的提高。

在cassandra中,还是以3)中例子说明,在A节点发往B节点的Message实例中有一个“这个请求的类型”的信息,这个信息保存在
Message实例内header实例的type_上,是一个字符串。这个字符串标明了当MessageService获取了Message实例后究竟由
那个
stage来负责执行指定的handler对象。因为handler对象只是定义了对Message的处理逻辑,所以需要stage里面的线程来对其进行
执行。

当前的stage只有4种,依次在StroageService的 /* All stage
identifiers
*/标注下被定义,分别负责不同的任务,“ROW-READ-STAGE”是负责读取的,其他的含义还没有完全搞清楚,大概是负责修改,数据压缩和
http后台页面信息接收的。

StageManager部分的源码还没哟看到,可能是负责各个stage间线程调度的吧。

6)设置nodepicker_定义了当前节点对周围节点的查询策略,具体的还不清楚

当某个end
point拿到一个key(比如"王老六")并想取出他的相关信息的时候,这个节点是怎么知道这个key的相关信息是存放在哪些节点中的呢?

以下将用从客户端拿到的"get_clomun"请求为例,进行说明:

"get_column"的相关信息会在CassandraServer的get_column(String tablename,
String key, String
columnPath)方法中被封装成一个readCommand实例,该对象简单包含了请求信息,另外也提供了一些别的方法。

然后该command实例会最终被交给
StorageProxy.readProtocol(command,StorageService.ConsistencyLevel.WEAK)来
处理,并期待这个方法能返回一个和key关联的columnFamily的数据,这些数据被装在一个row对象里返回的。

在上述的StorageProxy.readProtocol(...)这个方法中,首先要做的事情就是根据提交的key("王老六")确定他的相关信息
内容是存放在那些end point中的。而这个寻找end
point的工作由StorageService.instance().getNStorageEndPoint(command.key)来完成。

实际上,上述的StorageService的getNStorageEndPoint(String
key)方法调用了他持有的实例"nodePicker_"的getStorageEndPoints(Token token) 来完成寻找end
point这件事情。

题外话:StorageService这个实例提供了本地节点数据存储和与其他节点交互的服务,这个实例是在节点启动之初被建立的,在建立的时候该实例会
初始化一个nodePicker_放在其内部,这个nodePicker主要负责本地节点同其他节点的交互规则,也就是如何选择其他节点的策略。当前的策
略有两种:RackAwareStrategy--机架敏感 和
RackUnawareStrategy--非机架敏感,默认策略是RackUnawareStrategy。

所以默认情况下,nodePicker_就是一个RackUnawareStrategy实例,而他的
getStorageEndPoints(Token token)将被用来查找合适我们给出的key("王老六")的end
point,这里合适的意思是“存有关于这个key的信息”。而这里提到的token是由key封装得来的,他对key进行了hash,并且继承了
Comparable接口。也就是说,这个token实例是可以和其他同类token比较大小的,而这样做的目的是为了“可以在Token组成的list
上进行二分查询Collection.binarySort,并定位查找目标的在list中的相对位置”--这种查找操作将被经常使用。默认情况下
token是一个BigIntegerToken的实例。

nodePicker_最终会将查找的工作交给getStorageEndPoints(Token token,
Map<Token, EndPoint>
tokenToEndPointMap)来做,这个方法会返回一个查找到的end_point的数组。以下是对RackUnawareStrategy中
的相应方法的说明(因为默认是使用这个方法):

//此处的tokenToEndPointMap保存了该节点知道的所有其他节点,并且可以根据他们的token查找到

//此处的token是我们要查找的key("王老五")生成的token,他跟上面end
point的token生成的算法是相同的,也都是BigIntegerToken

public EndPoint[] getStorageEndPoints(Token token, Map<Token,
EndPoint> tokenToEndPointMap)

{

//根据key寻址的切入点

int startIndex;

List<EndPoint> list = new ArrayList<EndPoint>();

int foundCount = 0;

//将key的set装入一个ArrayList中,并完成排序,(一位token是comparable的)

List tokens = new
ArrayList<Token>(tokenToEndPointMap.keySet());

Collections.sort(tokens);

//在排好序的tokens中二叉树搜索token,并返回token所在的位置的index

//个人认为:根本别指望tokens里面能有我们要找的token,因为一个是end
point的token集合,一个是查找key("王老六")生成的

//token,生成的时候都使用了hash,并且加入了大随即数,重合的概率很低。

//这样做的主要目的是为了获得下面这个index,(参见binarySearch的说明可知)。

//也就是:虽然下面这条语句不大可能从tokens中取出东西,但是生成的index将告诉我们,哪个end
point的token离我们给出的

//key"王老六"的token“最近”。而且,在节点情况固定下来的情况下,用当前这种方法,

//key"王老六"的token确定的对应的end point也是不会改变的(每次都会取到这个end point,除非end
point本身注册的变了)。

//也就是说:这就成了一种方法,一种能够根据数据key("王老六")定位到一个end point的方法,

//如果写数据时使用了这种策略找到一个end point
节点然后写进数据,那么手持相应的key在读数据的时候,同样使用这个策略,也

//相应的能找到当时存入数据的那个节点,并把数据读出来。

int index = Collections.binarySearch(tokens, token);

//基本上这个条件里面的语句肯定会被执行

//那如果真的用key("王老六")从tokens中找到对应了呢?这个方法里的策略就是,那就直接拿这个被确定的节点来当
“邻居”节点了

//而key的tokens和end_point的token之所以能混在一起说话,是因为他们都是BigIntegerToken,在同一个数据空间中

if(index < 0)

{

//以下的运算将把key("王老六")的"邻居"节点给"翻译"出来

index = (index + 1) * (-1);//得到可以插入的位置

if (index >= tokens.size())

index = 0;

}

int totalNodes = tokens.size();

// Add the node at the index by default

//把我们找到的"邻居"节点放到一个要返回的list中,当然,对于一个key("王老六")的信息还可能在其他节点上存有副本(replicas)

//再下面的操作会将这些存有副本的节点也一同取出

list.add(tokenToEndPointMap.get(tokens.get(index)));

foundCount++;

//本来index是从0...一直到size()的一条线,如果我们用如下的方式选取index,就好似将所有的index数值放在了一个圆圈上,然后


//从小到大的方向在选取下一个index。startIndex就是一个切入点,本质上Partitioner提供了这个可被"切入"的结构和线索

//猜想这也是为什么叫Partitioner的原因吧

startIndex = (index + 1)%totalNodes;

// If we found N number of nodes we are good. This loop will
just exit. Otherwise just

// loop through the list and add until we have N nodes.

// replicas_对应storage-conf.xml中的最大副本数量

//将存有副本的节点也一同取出,就是最初"邻居"节点往后的若干个节点,具体若干几个是在storage-conf.xml的

//<ReplicationFactor>标签中定义的

for (int i = startIndex, count = 1; count < totalNodes
&& foundCount < replicas_; ++count, i = (i+1)%totalNodes)

{

if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))

{

list.add(tokenToEndPointMap.get(tokens.get(i)));

foundCount++;

}

}

//统一下节点监听端口的信息

retrofitPorts(list);

return list.toArray(new EndPoint[list.size()]);

}

(一)依赖:cassandra.jar

libthrift.jar

(二)连接:

//该方法将返回一个Cassandra.Client实例,该实例包含和server端指定节点会话的API

public Cassandra.Client getClient()

{

//192.168.0.169为想连接到的某个节点的ip,9160为端口

TSocket socket = new TSocket("192.168.0.169", 9160);

TTransport transport_ = socket;

TBinaryProtocol binaryProtocol = new TBinaryProtocol(transport_,
false, false);

Cassandra.Client cassandraClient = new
Cassandra.Client(binaryProtocol);

try

{

transport_.open();

}

catch(Exception e)

{

// Should move this to Log4J as well probably...

System.err.println("Exception " + e.getMessage());

e.printStackTrace();

}

return cassandraClient;

}

(三)API

1)该方法向key的指定path插入一个数据,另外想要对key某指定path的数据进行修改也使用这个方法

代码中的cli是一个Cassandra.Client实例

public void test_insert()

{

try {

String tableName = "Table1";

String key = "testkey";

String columnFamily = "Standard1";

String columnName = "testColumn";

String value = "testValue";

//对数据进行定位

String path = columnFamily+":"+columnName;

//向cassandra中插入一条数据

cli.insert(tableName,key,path,value.getBytes(),System.currentTimeMillis(),true);

} catch (InvalidRequestException e) {

e.printStackTrace();

} catch (TException e) {

e.printStackTrace();

} catch (UnavailableException e) {

e.printStackTrace();

}

}

2)该方法试用Cql语句的方式发送请求,有一个问题就是当请求的参数或不正确的时候,方法不会报错,只是返回为空;

但是在语法出现错误的时候还是会有InvalidRequestException抛出,提示Unresolved compilation
problems

代码中的cli是一个Cassandra.Client实例

public void test_excuteQuery()

{

try {

cli.executeQuery("set
Table1.Standard1['testKey2']['testColumn']='testValue2'");

CqlResult_t crt = cli.executeQuery("get
Table1.Standard1['testKey2']");

List<Map<String, String>> rs =
crt.getResultSet();

//遍历显示所有column_t中的内容

if(rs != null){

for(int i = 0 ; i < rs.size() ; i++){

Map<String , String> map = rs.get(i);

for(String key : map.keySet()){

System.out.println(key+" : "+map.get(key));

}

}

}else{

System.out.println("result set is null");

}

} catch (TException e) {

e.printStackTrace();

}

}

3)根据path信息获取一个column

path是一个定位信息,standard column family是两层 super column family是三层

superColumnFamily:superColumn:standardColumn。

public void test_get_column()

{

String tableName = "Table1";

String key = "testKey2";

String columnFamily = "Standard1";

String column = "testColumn";

String path = columnFamily+":"+column;

try {

column_t cmt = cli.get_column(tableName, key, path);

//以下两个是等价的

System.out.println(cmt.getColumnName());

System.out.println(cmt.getFieldValue(1));

//以下两个是等价的

System.out.println(new String(cmt.getValue(),"UTF-8"));

System.out.println(new String((byte[])
cmt.getFieldValue(2),"UTF-8"));

//以下两个是等价的

System.out.println(cmt.getTimestamp());

System.out.println(cmt.getFieldValue(3));

} catch (InvalidRequestException e) {

e.printStackTrace();

} catch (NotFoundException e) {

e.printStackTrace();

} catch (TException e) {

e.printStackTrace();

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

}

4)

//术语解释:每个key都对应一个Row的信息这个Row的信息又按照columnFamliy 分成了很多slice

//给定table,key和columnFamily的信息,

//此方法将返回,该key在当前columnFamily的slice中有都少个column元素

//该例子中key:"testKey2"在columnFamily:"Standard1"中的slice有几个column元素

public void test_get_column_count()

{

String tableName = "Table1";

String keyName = "testKey2";

String columnFamily = "Standard1";

try {

System.out.println(cli.get_column_count(tableName, keyName,
columnFamily));

} catch (InvalidRequestException e) {

e.printStackTrace();

} catch (TException e) {

e.printStackTrace();

}

}

5) //暂不确定

//根据给定的时间点,取出以后的对应给定key的一组columns

//但是好像在时间上有延迟,这个不确定

public void test_get_columns_since()

{

String tableName = "Table1";

String keyName = "testKey2";

String columnFamily = "Standard1";

Long sinceTime =
System.currentTimeMillis()-5000*1000;//取当前时间之前120秒以内的

try {

List<column_t> cmts = cli.get_columns_since(tableName ,
keyName , columnFamily , sinceTime);

System.out.println("cmts size : "+cmts.size());

} catch (InvalidRequestException e) {

e.printStackTrace();

} catch (NotFoundException e) {

e.printStackTrace();

} catch (TException e) {

e.printStackTrace();

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

}

6)//range queries may only be performed against an order-preserving
partitioner

//如果系统试用的节点定位的RandomPartitioner的话那么这个方法将不能试用,好处是跟均衡的存储

//此方法尚在研究中

public void test_get_key_range()

{

try {

cli.get_key_range("Table1", "??", "??", 1);

} catch (InvalidRequestException e) {

e.printStackTrace();

} catch (TException e) {

e.printStackTrace();

}

}

7) //术语解释:每个key都对应一个Row的信息这个Row的信息又按照columnFamliy 分成了很多slice

//每一个columnFamily都存在一个排序,或按照name或按照time

//该方法将取出对应某个key的一定区间段内的column元素

public void test_get_slice()

{

String tableName = "Table1";

String keyName = "testKey2";

String columnFamily = "Standard1";
//想要从哪个columnFamily中获取column

int start = 0;//起始位置

int count = 10;//获取数量

try {

List<column_t> cmts = cli.get_slice(tableName,
keyName, columnFamily, start, count);

System.out.println("cmts size : "+cmts.size());

} catch (InvalidRequestException e) {

e.printStackTrace();

} catch (NotFoundException e) {

e.printStackTrace();

} catch (TException e) {

e.printStackTrace();

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

}

8)//通过一个column的名字数组来查找相对应的属于给定key的slice(一组column 元素)

public void test_get_slice_by_names()

{

String tableName = "Table1";

String keyName = "testKey";

String columnFamily = "Standard1";

List<String> columnNameList = new
ArrayList<String>();

columnNameList.add("testColumn");

try{

List<column_t> cmts =
cli.get_slice_by_names(tableName, keyName, columnFamily,
columnNameList);

}catch(Exception e){

e.printStackTrace();

}

}

9)//描述一个table中的各个columnFamily的基本信息

public void test_describe_table()

{

try {

System.out.println(cli.describeTable("Table1"));

} catch (TException e) {

e.printStackTrace();

}

}

10)//批量的针对一个key插入数据

public void test_batch_insert()

{

try{

String tableName = "Table1";

String keyName = "testkey2";

//当下的Map保存了ColumnFamily到他内部的column元素列表的一个映射

//所以String 应该保存 对应columnFamily的名字。

//column_t(column元素)包含:columnName , value , timestamp 三要素

Map<String , List<column_t>> CFmap = new
HashMap<String , List<column_t>>();

//制作两个column_t实例

column_t cmt1 = new column_t("test_column_name_1" ,
"test_column_value_1".getBytes() , System.currentTimeMillis());

column_t cmt2 = new column_t("test_column_name_2" ,
"test_column_value_2".getBytes() , System.currentTimeMillis());

//制作一个column_t的list

List<column_t> cList = new
ArrayList<column_t>();

cList.add(cmt1);

cList.add(cmt2);

//制作一个将其写入CFmap中

String columnFamilyName = "Standard1";

CFmap.put(columnFamilyName,
cList);//这样在以后的插入操作中,相应的column_t将插入"Docin"这个family中

batch_mutation_t bmt = new batch_mutation_t(tableName ,
keyName , CFmap);//完成插入的key的定位

//执行这个针对相应key和colmunFamily的batch_insert

cli.batch_insert(bmt, true);

}catch(Exception e){

e.printStackTrace();

}

}

11)//同insert()相对的反向操作

//当我们试图查询一个不存在的column_t元素的时候,会有"NotFoundException"异常被抛出

public void test_remove()

{

String tableName = "Table1";

String columnFamily = "Standard1";

String columnName = "removetest";

String keyName = "removetestkey";

String value = "removetestvalue";

String path = columnFamily+":"+columnName;

try{

//先将相应的column元素写入

cli.insert(tableName, keyName, path, value.getBytes(),
System.currentTimeMillis(), true);

//尝试读出信息

column_t cmt = cli.get_column(tableName, keyName, path);

displayColumn(cmt);

//删除这条信息

cli.remove(tableName, keyName, path,
System.currentTimeMillis(), true);

}catch(Exception e){

e.printStackTrace();

}

//尝试读出信息

column_t cmt2;

try {

cmt2 = cli.get_column(tableName, keyName, path);

displayColumn(cmt2);

} catch (InvalidRequestException e) {

e.printStackTrace();

} catch (NotFoundException e) {

System.out.println("NotFoundException 异常抛出,column_元素已经被删除");

} catch (TException e) {

e.printStackTrace();

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

}

12)//插入一个superColumn元素

//不含有内容的superColumn不能同过该方法被创建,会抛出异常

public void test_batch_insert_super_t()

{

try {

String tableName = "Table1";

String keyName = "superkeytest";

String superColumnFamilyName = "Super1";

//产生1个superColumn元素

//首先产生1个column_t的list

List<column_t> Clist = new
ArrayList<column_t>();

//当两个名字一样的时候,先前的会被覆盖

Clist.add(new
column_t("stardcolumninsuper1","stardcolumn1".getBytes(),System.currentTimeMillis()));

Clist.add(new
column_t("stardcolumninsuper2","stardcolumn2".getBytes(),System.currentTimeMillis()));

//产生一个superColumn元素

String superColumnName = "superColumn3";

superColumn_t sct = new superColumn_t(superColumnName ,
Clist);

//将这个元素装入到一个superColumn_t的List中

List<superColumn_t> SClist = new
ArrayList<superColumn_t>();

SClist.add(sct);

//将这个super_column
的list连同对应的superColumnFamily的名字一起放入一个hashmap中

Map<String , List<superColumn_t>> cfmap = new
HashMap<String , List<superColumn_t>>();

cfmap.put(superColumnFamilyName, SClist);

//区别于一般的,这里要求的column元素是superColumn元素

//将整理好的信息关联一个key后生成一个mutation

batch_mutation_super_t bmst = new
batch_mutation_super_t(tableName,keyName,cfmap);

boolean block = true;

//执行这个mutation的插入操作

cli.batch_insert_superColumn(bmst, block);

} catch (InvalidRequestException e) {

e.printStackTrace();

} catch (UnavailableException e) {

e.printStackTrace();

} catch (TException e) {

e.printStackTrace();

}

}

13)//获取一个SuperColumn中的信息

public void test_get_superColumn()

{

String tableName = "Table1";

String keyName = "superkeytest";

String superColumnFamilyName = "Super1";

String superColumnName = "superColumn3";

String path = superColumnFamilyName+":"+superColumnName;

try {

superColumn_t sct = cli.get_superColumn(tableName, keyName,
path);

if(sct != null){

System.out.println("Super Name : "+sct.getName());

System.out.println("Size : "+sct.getColumnsSize());

List<column_t> clist = sct.getColumns();

}else{

System.out.println("super column is empty");

}

} catch (InvalidRequestException e) {

e.printStackTrace();

} catch (NotFoundException e) {

e.printStackTrace();

} catch (TException e) {

e.printStackTrace();

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

}

14)//尝试向一个superColumn中添加column元素(就是column_t对象)

//注意:想superColumnFamily中添加一般的column元素是不被允许的,所以superColumnFamliy中只能有
superColumn,而superColumn中只能有一般的column

//而一般的columnFamily中也只能有一般的column

public void test_insert_standard_to_super()

{

String tableName = "Table1";

String keyName = "superkeytest";

String superColumnFamily = "Super1";

String superColumnName = "superColumn1";

String columnName = "normal_column";

String value = "normal_column_value_add";

String path =
superColumnFamily+":"+superColumnName+":"+columnName;

try {

cli.insert(tableName, keyName, path,
value.getBytes(),System.currentTimeMillis(), true);

//尝试取出这个元素并显示这个元素

column_t cmt = cli.get_column(tableName, keyName, path);

if(cmt != null){

System.out.println("column name :
"+cmt.getColumnName());

System.out.println("column value: "+new
String(cmt.getValue(),"UTF-8"));

System.out.println("column time :
"+cmt.getTimestamp()+"/n");

}else{

System.out.println("column is empty");

}

} catch (InvalidRequestException e) {

e.printStackTrace();

} catch (UnavailableException e) {

e.printStackTrace();

} catch (TException e) {

e.printStackTrace();

} catch (NotFoundException e) {

e.printStackTrace();

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

}

15)//术语解释:每个key都对应一个Row的信息这个Row的信息又按照columnFamliy
或superColumnFamily分成了很多slice

//同针对一般的columnFamliy类似,这个方法是获取给定key的某个superColumnFamily的slice,这个slice中的
元素都是superColumn

public void test_get_slice_super()

{

String tableName = "Table1";

String keyName = "superkeytest";

String superColumnFamily = "Super1";

try {

List<superColumn_t> sct =
cli.get_slice_super(tableName, keyName, superColumnFamily, 0, 10);

} catch (InvalidRequestException e) {

e.printStackTrace();

} catch (TException e) {

e.printStackTrace();

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

}

16)

//尝试删除一个superColumn节点,和属于该节点的column_t节点

//测试需要通过path来定位这个节点

public void test_remove_super()

{

String tableName = "Table1";

String keyName = "superkeytest";

String superColumnFamily = "Super1";

String superColumn = "superColumn3";

String columnName = "stardcolumninsuper1";

String path = superColumnFamily+":"+superColumn+":"+columnName;

try {

//先建立一个super_column_family 名字测试的时候是"superColumn3"

//产生1个superColumn元素

//首先产生1个column_t的list

List<column_t> Clist = new
ArrayList<column_t>();

//当两个名字一样的时候,先前的会被覆盖

Clist.add(new
column_t("stardcolumninsuper1","stardcolumn1".getBytes(),System.currentTimeMillis()));

Clist.add(new
column_t("stardcolumninsuper2","stardcolumn2".getBytes(),System.currentTimeMillis()));

//产生一个superColumn元素

String superColumnName = "superColumn3";

superColumn_t sct = new superColumn_t(superColumnName ,
Clist);

//将这个元素装入到一个superColumn_t的List中

List<superColumn_t> SClist = new
ArrayList<superColumn_t>();

SClist.add(sct);

//将这个super_column
的list连同对应的superColumnFamily的名字一起放入一个hashmap中

Map<String , List<superColumn_t>> cfmap = new
HashMap<String , List<superColumn_t>>();

cfmap.put(superColumnFamilyName, SClist);

//区别于一般的,这里要求的column元素是superColumn元素

//将整理好的信息关联一个key后生成一个mutation

batch_mutation_super_t bmst = new
batch_mutation_super_t(tableName,keyName,cfmap);

boolean block = true;

//执行这个mutation的插入操作

cli.batch_insert_superColumn(bmst, block);

//尝试删除之

cli.remove(tableName, keyName, path,
System.currentTimeMillis(), true);

} catch (InvalidRequestException e) {

e.printStackTrace();

} catch (UnavailableException e) {

e.printStackTrace();

} catch (TException e) {

e.printStackTrace();

}

}

17)//这个方法目前还不健全

public void test_getProperty()

{

try {

//这事一个未健全的方法,现在只能返回对应"tables"的参数,也就是表名

List<String> Plist =
cli.getStringListProperty("tables");

for(int i = 0 ; i < Plist.size() ; i++){

System.out.println(Plist.get(i));

}

} catch (TException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

//现在只支持如下三个参数

try {

System.out.println(cli.getStringProperty("cluster name"));

//会直接把server端的storage_conf.xml读进来,可能用于做进一步的xml分析

System.out.println(cli.getStringProperty("config file"));

System.out.println(cli.getStringProperty("version"));

} catch (TException e) {

e.printStackTrace();

}

}

一些注意的:

1)cli的实例还有一些诸如sent_*和recv_*(成对出现)的公共方法,这些方法是供上面罗列的相应方法调用的,一般不用管,

2)sent_*负责想服务器端发送消息

3)recv_*将处理返回的所要的结果或处理错误信息抛出相应异常

4)大部分方法可执行的前提是必须有一个精确的key信息,(describe_table和get_key_range除外)。

5)一个superColumnFamily中只能存放superColumn_t元素而不能存放column_t元素

6)一个columnFamily中只能存放column_t元素

1)columnFamily下一个column和多个column的读取区别

2)columnfamily 和superColumnFamily的读取区别

测试机数量:两台,jvm最大使用内存都开到1.3G。

起始key: 1356278962 ;

改变组:

product_name1 : "是一个非常可靠的大规模分布式存储系统"

product_name2 : "中国惨败伊朗丢亚锦赛冠军创34年参赛最耻辱一败"

控制组:

product_value1 : "第一次在主场丢冠军"

product_value2 : "胡雪峰顶替受伤的刘炜"

product_value3 : "朱芳雨都有外线出手机会"

product_value4 : "张庆鹏传球意图太过明显"

product_value5 : "最后一节比赛"

product_value6 : "中国队首发:易建联、王治郅、朱芳雨、王仕鹏、胡雪峰"

product_value7 : "中国队本次亚锦赛首次尝到失利的滋味,在家门口把冠军拱手相让"

product_value8 : "下半场易边再战,中国队仍然如同梦游,球员之间没有形成整体"

//下面这个是一个要存入的“摘要”

product_value9 : "下半场易边再战,中国队仍然如同梦游,球员之间没有形成整体,

单打独斗的进攻模式成功率相当低。伊朗队内外结合,多点得分,

继续扩大分差。朱芳雨传球被断,对手长传快攻,14号球员扣篮得分,

28-53。王治郅强攻得手,造成哈达迪犯规,加罚命中。

王治郅再次溜到篮下,反身投篮得分。王治郅成为中国队的唯一亮点,

持球接连晃过三名球员的防守,投篮得分,加罚再中,

36-53。靠着王治郅的出色发挥,中国队留住翻盘的一线希望。

第三节结束,39-56,分差仍为17分。"

我们有的ColumnFamily: Standard1 Standard2 Super1 Super2

测试1:
向Standard1中写入10万条记录,此时Standard1中只有"product_name"一个column,key是从1356278962
开始往后10万个

insert()进行插入,速度很慢,不可能在一小时内完成

CQL 用时256秒,关掉log后用时55秒

测试2: 然后在product_name1 和
product_name2之间反复修改key-1356279962对应的"produck_name"这个column的内容10万次

CQL:

当config中的ReplicationFactor为1的时候,用时120秒

当config中的ReplicationFactor为2的时候,用时183秒;关掉log后用时42秒

测试3: 将存在的各个key的product_name读取一遍(此时相应columnFamily中只有1个column)

当config中的ReplicationFactor为1的时候:速度相当慢

当config中的ReplicationFactor为2的时候:

CQL:

用时267秒;关掉log后用时145秒

API:

用get_column用时203秒;关掉log后用时135秒

以下均以ReplicationFactor=2来进行测试

测试4: 依照已经存在的各个key,向Standard1中写入10万条控制组里面的column数据

API:

batch_insert() 速度非常慢

CQL:

90万个column元素,其中包括一个类似摘要大小的字符串(200字左右),用时2004秒;关掉log后用时512秒。

测试5: 然后在product_name1 和
product_name2之间反复修改key-1356279962对应的"produck_name"这个column的内容10万次

API:

使用insert()速度相当慢

CQL:

用时190秒;关掉log后用时45秒

测试6: 将存在的各个key的product_name读取一遍(此时相应columnFamily中已经有10个column了)

API:

使用get_column()用时757秒;关掉log后用时188秒

CQL:

用时更慢一些,用时861秒;关掉log后用时202秒

super测试1:在Super1中创建superColumn1,在superColumn1中只创建1个
column-"product_name",然后写入10万条数据,key是从1356278962开始往后10万个

CQL:

用时316秒;关掉log后用时60秒

super测试2:然后在product_name1 和
product_name2之间反复修改superColumn1下key-1356279962对应的"produck_name"这个column的
内容10万次

CQL:

用时195秒;关掉log后用时62秒

super测试3:将存在的各个key的product_name读取一遍(此时相应superColumnFamily中只有1个column)

API:

get_column() 用时235秒;关掉log后用时234秒

CQL:

用时356秒 ;关掉log后用时158秒

super测试4:依照已经存在的各个key,依次向superColumn1中写入10万条控制组里面的column数据

CQL:

用时2058秒;关掉log后,竟然非常慢,不知道为什么,但是重新启动下节点后,速度变快,用时562秒

super测试4:然后在product_name1 和
product_name2之间反复修改superColumn1下key-1356279962对应的"produck_name"这个column的
内容10万次

CQL:

用时190秒,关掉log后用时48秒

super测试6:将存在的各个key的product_name读取一遍(此时相应superColumnFamily中已经有10个column了)

API:

用get_column 用时 356秒;关掉log后用时191秒

CQL:

用时314秒;关掉log后用时208秒

结论:使用superColumn的开销不比column大很多,向columnFamily中加入更多的column对读取速度的影响不大。相比
而言受log和ReplicationFactor的影响更大。另外在写入的时候CQL的效率更高,在读取的时候get_column更好一些。而API
中的其他方法如batch_insert(),insert()效率明显不高。但这只是两个节点的情况,当有更多节点的时候情况可能又会不一样了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: