您的位置:首页 > 编程语言 > Java开发

Cassandra客户端连接的说明---Java版

2011-12-25 14:08 417 查看
在使用Java客户端调用之前,我们需要通过Cassandra CLI在服务器上创建相应的keyspace和column family.

参考:http://wiki.apache.org/cassandra/CassandraCli

进入cassandra/bin目录,运行指令如下

./cassandra-cli

connect 192.168.11.124/9160;

create keyspace LiftDNA_KeySpace;

use LiftDNA_KeySpace;

create column family User_ColumnFamily with comparator = UTF8Type;

或者建立counter:

create column family User_SuperCounter with default_validation_class = CounterColumnType;

如果需要建立其他类型的簇,参见本文底部链接。

然后运行一下Java代码:

import java.util.List;
import java.io.UnsupportedEncodingException;

import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.cassandra.thrift.AuthenticationException;
import org.apache.cassandra.thrift.AuthorizationException;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.KeySlice;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;

public class Test {

public static void main(String[] args) throws TTransportException, UnsupportedEncodingException, InvalidRequestException, NotFoundException, UnavailableException, TimedOutException, TException, AuthenticationException, AuthorizationException {

TTransport tr = new TFramedTransport(new TSocket("192.168.11.124", 9160));
TProtocol proto = new TBinaryProtocol(tr);

Cassandra.Client client = new Cassandra.Client(proto);
tr.open();
String keyspace = "LiftDNA_KeySpace";
client.set_keyspace(keyspace);
//record id
String key_user_id = "1";
String columnFamily = "User_ColumnFamily";
// insert data
long timestamp = System.currentTimeMillis();
Random r = new Random(timestamp);
Column nameColumn = new Column(ByteBuffer.wrap("name".getBytes()));
nameColumn.setValue(Long.toHexString(r.nextLong()).getBytes());
nameColumn.setTimestamp(timestamp);

Column ageColumn = new Column(ByteBuffer.wrap("age".getBytes()));
ageColumn.setValue(Long.toHexString(r.nextLong()).getBytes());
ageColumn.setTimestamp(timestamp);

ColumnParent columnParent = new ColumnParent(columnFamily);
client.insert(ByteBuffer.wrap(key_user_id.getBytes()), columnParent,nameColumn,ConsistencyLevel.ALL) ;
client.insert(ByteBuffer.wrap(key_user_id.getBytes()), columnParent,ageColumn,ConsistencyLevel.ALL);

//Gets column by key
SlicePredicate predicate = new SlicePredicate();
predicate.setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[0]), ByteBuffer.wrap(new byte[0]), false, 100));
List<ColumnOrSuperColumn> columnsByKey = client.get_slice(ByteBuffer.wrap(key_user_id.getBytes()), columnParent, predicate, ConsistencyLevel.ALL);
System.out.println(columnsByKey);

//Get all keys
KeyRange keyRange = new KeyRange(100);
keyRange.setStart_key(new byte[0]);
keyRange.setEnd_key(new byte[0]);
List<KeySlice> keySlices = client.get_range_slices(columnParent, predicate, keyRange, ConsistencyLevel.ONE);
System.out.println(keySlices.size());
System.out.println(keySlices);
for (KeySlice ks : keySlices) {
System.out.println(new String(ks.getKey()));
}
tr.close();
}
}


所需Java包:

apache-cassandra-1.0.6.jar

apache-cassandra-clientutil-1.0.6.jar

apache-cassandra-thrift-1.0.6.jar

libthrift-0.6.jar

slf4j-api-1.6.1.jar

所有第三方Jar包均可在Cassandra/lib里找到。

此外,我们还可以使用Hector http://rantav.github.com/hector/build/html/index.html


https://github.com/rantav/hector

Hector提供了对Cassandra操作的更高级的封装。

https://github.com/rantav/hector/wiki/Getting-started-%285-minutes%29

连接测试:

import java.util.List;

import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.ThriftCluster;
import me.prettyprint.hector.api.ddl.KeyspaceDefinition;

public class Test {

    public static void main(String[] args) {
        
        CassandraHostConfigurator cassandraHostConfigurator = new CassandraHostConfigurator("192.168.11.124:9160");
        ThriftCluster cassandraCluster = new ThriftCluster("Test Cluster", cassandraHostConfigurator);
        List<KeyspaceDefinition> keyspaces = cassandraCluster.describeKeyspaces();
        for (KeyspaceDefinition keyspaceDefinition : keyspaces) {
            System.out.println(keyspaceDefinition.getName());
        }
    }
}
参考代码:

https://github.com/rantav/hector/blob/master/core/src/test/java/me/prettyprint/cassandra/service/CassandraClusterTest.java#L84-157

其他测试代码:

import static me.prettyprint.hector.api.factory.HFactory.createColumn;
import static me.prettyprint.hector.api.factory.HFactory.createColumnQuery;
import static me.prettyprint.hector.api.factory.HFactory.createKeyspace;
import static me.prettyprint.hector.api.factory.HFactory.createMultigetSliceQuery;
import static me.prettyprint.hector.api.factory.HFactory.createMutator;
import static me.prettyprint.hector.api.factory.HFactory.getOrCreateCluster;

import java.util.HashMap;
import java.util.Map;

import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.Serializer;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.Rows;
import me.prettyprint.hector.api.exceptions.HectorException;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.ColumnQuery;
import me.prettyprint.hector.api.query.MultigetSliceQuery;
import me.prettyprint.hector.api.query.QueryResult;

/**
* Thread Safe
* @author Ran Tavory
*
*/
public class ExampleDaoV2 {

private final static String KEYSPACE = "LiftDNA_KeySpace";
private final static String HOST_PORT = "192.168.11.124:9160";
private final static String CF_NAME = "User_ColumnFamily";
/** Column name where values are stored */
private final static String COLUMN_NAME = "v";
private final StringSerializer serializer = StringSerializer.get();

private final Keyspace keyspace;

public static void main(String[] args) throws HectorException {
Cluster c = getOrCreateCluster("Test Cluster", HOST_PORT);
ExampleDaoV2 ed = new ExampleDaoV2(createKeyspace(KEYSPACE, c));
ed.insert("key1", "value1", StringSerializer.get());

System.out.println(ed.get("key1", StringSerializer.get()));
}

public ExampleDaoV2(Keyspace keyspace) {
this.keyspace = keyspace;
}

/**
* Insert a new value keyed by key
*
* @param key   Key for the value
* @param value the String value to insert
*/
public <K> void insert(final K key, final String value, Serializer<K> keySerializer) {
createMutator(keyspace, keySerializer).insert(
key, CF_NAME, createColumn(COLUMN_NAME, value, serializer, serializer));
}

/**
* Get a string value.
*
* @return The string value; null if no value exists for the given key.
*/
public <K> String get(final K key, Serializer<K> keySerializer) throws HectorException {
ColumnQuery<K, String, String> q = createColumnQuery(keyspace, keySerializer, serializer, serializer);
QueryResult<HColumn<String, String>> r = q.setKey(key).
setName(COLUMN_NAME).
setColumnFamily(CF_NAME).
execute();
HColumn<String, String> c = r.get();
return c == null ? null : c.getValue();
}

/**
* Get multiple values
* @param keys
* @return
*/
public <K> Map<K, String> getMulti(Serializer<K> keySerializer, K... keys) {
MultigetSliceQuery<K, String,String> q = createMultigetSliceQuery(keyspace, keySerializer, serializer, serializer);
q.setColumnFamily(CF_NAME);
q.setKeys(keys);
q.setColumnNames(COLUMN_NAME);

QueryResult<Rows<K, String,String>> r = q.execute();
Rows<K, String,String> rows = r.get();
Map<K, String> ret = new HashMap<K, String>(keys.length);
for (K k: keys) {
HColumn<String,String> c = rows.getByKey(k).getColumnSlice().getColumnByName(COLUMN_NAME);
if (c != null && c.getValue() != null) {
ret.put(k, c.getValue());
}
}
return ret;
}

/**
* Insert multiple values
*/
public <K> void insertMulti(Map<K, String> keyValues, Serializer<K> keySerializer) {
Mutator<K> m = createMutator(keyspace, keySerializer);
for (Map.Entry<K, String> keyValue: keyValues.entrySet()) {
m.addInsertion(keyValue.getKey(), CF_NAME,
createColumn(COLUMN_NAME, keyValue.getValue(), keyspace.createClock(), serializer, serializer));
}
m.execute();
}

/**
* Delete multiple values
*/
public <K> void delete(Serializer<K> keySerializer, K... keys) {
Mutator<K> m = createMutator(keyspace, keySerializer);
for (K key: keys) {
m.addDeletion(key, CF_NAME,  COLUMN_NAME, serializer);
}
m.execute();
}
}

使用CQL查询数据(基于Hector)

Cluster c = getOrCreateCluster("Test Cluster", HOST_PORT);
Keyspace keyspace = createKeyspace("LiftDNA_KeySpace", c);
StringSerializer serializer = StringSerializer.get();
CqlQuery<String,String,String> cqlQuery = new CqlQuery<String,String,String>(keyspace, serializer, serializer, serializer);
cqlQuery.setQuery("select * from User_ColumnFamily");
QueryResult<CqlRows<String,String,String>> result = cqlQuery.execute();
System.out.println(result.get().getByKey("rkey").getColumnSlice().getColumnByName("ckey").getValue());
参考网址:https://github.com/rantav/hector/wiki/Using-CQL

A little piece of code to increment counters of a superColumn

String key = "keyOfRow";
String superColumnFamily = "foo";
String superColumnName ="bar";
StringSerializer ss = StringSerializer.get();
// let's insert a couple of counter columns in a superColumn
String columnName1="aaa";
String columnName2="bbb";
List <HCounterColumn<String>> myCounters = new
ArrayList<HCounterColumn<String>>();
myCounters.add(HFactory.createCounterColumn(columnName1, 1L));
myCounters.add(HFactory.createCounterColumn(columnName2, 42L));
Mutator mutator = HFactory.createMutator(keyspace,
StringSerializer.get());
mutator.insertCounter(key,superColumnFamily ,
HFactory.createCounterSuperColumn(superColumnName, myCounters, ss, ss));
mutator.execute();

其他资料:

http://wingware.iteye.com/blog/1144584


http://cmzx3444.iteye.com/blog/663800

http://www.cnblogs.com/mobile/archive/2010/06/12/1757272.html


http://www.360doc.com/content/11/0309/00/5728962_99408528.shtml


其他客户端工具:

http://wiki.apache.org/cassandra/ClientOptions

最后,我们还需要重新去理解一下Cassandra的数据结构。

http://wiki.apache.org/cassandra/DataModel

附加内容:几个Cassandra建列族的例子

/*This file contains an example Keyspace that can be created using the
cassandra-cli command line interface as follows.

bin/cassandra-cli -host localhost --file conf/schema-sample.txt

The cassandra-cli includes online help that explains the statements below. You can
accessed the help without connecting to a running cassandra instance by starting the
client and typing "help;"
*/

create keyspace Keyspace1
    with strategy_options=[{replication_factor:1}]
    and placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy';

use Keyspace1;

create column family Standard1
    with comparator = BytesType
    and keys_cached = 10000
    and rows_cached = 1000
    and row_cache_save_period = 0
    and key_cache_save_period = 3600
    and memtable_flush_after = 59
    and memtable_throughput = 255
    and memtable_operations = 0.29;

create column family Standard2
    with comparator = UTF8Type
    and read_repair_chance = 0.1
    and keys_cached = 100
    and gc_grace = 0
    and min_compaction_threshold = 5
    and max_compaction_threshold = 31;

create column family StandardByUUID1
    with comparator = TimeUUIDType;

create column family Super1
    with column_type = Super
    and comparator = BytesType
    and subcomparator = BytesType;

create column family Super2
    with column_type = Super
    and subcomparator = UTF8Type
    and rows_cached = 10000
    and keys_cached = 50
    and comment = 'A column family with supercolumns, whose column and subcolumn names are UTF8 strings';

create column family Super3
    with column_type = Super
    and comparator = LongType
    and comment = 'A column family with supercolumns, whose column names are Longs (8 bytes)';

create column family Indexed1
    with comparator = UTF8Type
    and default_validation_class = LongType
    and column_metadata = [{
        column_name : birthdate,
        validation_class : LongType,
        index_name : birthdate_idx,
        index_type : 0}
    ];

create column family Counter1
    with default_validation_class = CounterColumnType;

create column family SuperCounter1
    with column_type = Super
    and default_validation_class = CounterColumnType;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息