您的位置:首页 > 其它

利用thrift API访问Cassandra 第二天

2011-04-12 16:18 267 查看
忘记说Cassandra的版本信息了,apache-cassandra-0.7.2
今天写几个基本的操作,插入,删除,查询
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.KeySlice;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.thrift.Cassandra.Client;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
/*
* Cassandra操作サンプル
* @author ryu
* @date 2011-03-07
*/
public class JCassandraClient {
public static void main(String[] args) throws InvalidRequestException,
NotFoundException, UnavailableException, TimedOutException,
TException, UnsupportedEncodingException {
// TSocket socket = new TSocket("192.168.1.113", 9160);
TSocket socket = new TSocket("localhost", 9160);
TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket));
Client client = new Client(binaryProtocol);
socket.open();
// ※データの最小単位はcolumnです。columnの構造は[name,value,timestamp]です。
// 指定しているrow keyにcolumnを追加する
insertColumnSample(client);
// 指定しているcolumnを取得する
getOneColumnSample(client);
// 指定しているcolumnを削除する
deleteColumnSample(client);
// column family内にすべてのcolumnを取得する
//getRowsColumnSample(client);
// 一行のcolumnを取得する
//getOneRowColumnSample(client);
// データベース切断
socket.close();
}
/**
* 登録データ
*
* @param client
*/
private static void insertColumnSample(Client client){
try{
// Keyspace
String keyspace = "Ryu";
// Row Key
String key = "1000000001";
// Column Family
String columnFamily = "T_ERPBASE_USER";
// 指定Keyspace
client.set_keyspace(keyspace);
ByteBuffer keyBuffer = ByteBuffer.wrap(key.getBytes("UTF-8"));
ColumnParent parent = new ColumnParent(columnFamily).setSuper_column((ByteBuffer) null);

// 新規しているColumn:[name,value,timestamp]
Column newCol = new Column(ByteBufferUtil.bytes("USER_NAME"),ByteBufferUtil.bytes("テスト太郎"), System.currentTimeMillis());
// ①新規または更新
client.insert(keyBuffer, parent, newCol,ConsistencyLevel.ONE);
} catch(UnsupportedEncodingException e){
e.printStackTrace();
} catch(TException e){
e.printStackTrace();
} catch(InvalidRequestException e){
e.printStackTrace();
} catch(TimedOutException e){
e.printStackTrace();
} catch(UnavailableException e){
e.printStackTrace();
}
}
/**
* 指定しているcolumnを取得する
*
* @param client
*/
private static void getOneColumnSample(Client client){
try{
// Keyspace
String keyspace = "Ryu";
// Row Key
String key = "1000000001";
// Column Family
String columnFamily = "T_ERPBASE_USER";
ByteBuffer keyBuffer = ByteBuffer.wrap(key.getBytes("UTF-8"));
// 指定Keyspace
client.set_keyspace(keyspace);
// ②指定しているcolumnデータを取得
// 検索対象column;検索条件name = USER_NAME
ColumnPath col = new ColumnPath(columnFamily).setSuper_column((ByteBuffer) null).setColumn("USER_NAME".getBytes());
Column column = client.get(keyBuffer, col, ConsistencyLevel.ONE).column;

System.out.println("read row " + new String(keyBuffer.array(), "UTF-8") + " " + new String(column.getName(), "UTF-8") + ":" + new String(column.getValue(), "UTF-8") + ":" + column.timestamp);
} catch(UnsupportedEncodingException e){
e.printStackTrace();
} catch(TException e){
e.printStackTrace();
} catch(InvalidRequestException e){
e.printStackTrace();
} catch(TimedOutException e){
e.printStackTrace();
} catch(UnavailableException e){
e.printStackTrace();
} catch(NotFoundException e){
e.printStackTrace();
}
}
/**
* 指定しているcolumnを削除する(物理削除)
*
* @param client
*/
private static void deleteColumnSample(Client client){
try{
// Keyspace
String keyspace = "Ryu";
// Row Key
String key = "1000000001";
// Column Family
String columnFamily = "T_ERPBASE_USER";
ByteBuffer keyBuffer = ByteBuffer.wrap(key.getBytes("UTF-8"));
// 指定Keyspace
client.set_keyspace(keyspace);
ColumnPath col = new ColumnPath(columnFamily).setSuper_column((ByteBuffer) null).setColumn("USER_NAME".getBytes());
// ③削除
client.remove(keyBuffer, col, System.currentTimeMillis(), ConsistencyLevel.ONE);
} catch(UnsupportedEncodingException e){
e.printStackTrace();
} catch(TException e){
e.printStackTrace();
} catch(InvalidRequestException e){
e.printStackTrace();
} catch(TimedOutException e){
e.printStackTrace();
} catch(UnavailableException e){
e.printStackTrace();
}
}
/**
* column familyの内にすべてのcolumnを
*
* @param client
*/
private static void getRowsColumnSample(Client client){
try{
// Keyspace
String keyspace = "Ryu";
// Row Key
String key = "rowKey_1000000001";
// Column Family
String columnFamily = "T_ERPBASE_USER";

ByteBuffer keyBuffer = ByteBuffer.wrap(key.getBytes("UTF-8"));
ColumnParent parent = new ColumnParent(columnFamily).setSuper_column((ByteBuffer) null);

// 指定Keyspace
client.set_keyspace(keyspace);
// 複数行データを取得
KsDef ksDef = client.describe_keyspace(keyspace);

if(ksDef != null){
// (列設定)
SlicePredicate predicate = new SlicePredicate();
// 指定しているcolumn名により抽出
// predicate.addToColumn_names(ByteBuffer.wrap("USER_ID".getBytes("UTF-8")));
// predicate.addToColumn_names(ByteBuffer.wrap("USER_PASS".getBytes("UTF-8")));
// predicate.addToColumn_names(ByteBuffer.wrap("USER_NAME".getBytes("UTF-8")));
// 指定しているcolumn範囲により抽出
SliceRange sliceRange = new SliceRange();
// すべて抽出
// nameにより開始箇所未限定
sliceRange.setStart(new byte[0]);
// nameにより終了箇所未限定
sliceRange.setFinish(new byte[0]);
// データ件数(column列数)
//sliceRange.setCount(3);
// 順次優先で データ件数設定により、データを取得。true:降順;false:昇順
sliceRange.setReversed(false);
predicate.setSlice_range(sliceRange);
List cf_defs = ksDef.getCf_defs();
for(int i = 0; i < cf_defs.size(); i ++){
CfDef cfDef = cf_defs.get(i);
if(cfDef.getName().equals(columnFamily)){
// すべてのキーに検索(行設定)
KeyRange keyRange = new KeyRange();
keyRange.setStart_key(ByteBuffer.wrap("".getBytes("UTF-8")));
keyRange.setEnd_key(ByteBuffer.wrap("".getBytes("UTF-8")));
int length = client.get_count(keyBuffer ,parent, predicate, ConsistencyLevel.ONE);
System.out.println("length==========================================>" + length);
List lstRow = client.get_range_slices(parent, predicate, keyRange, ConsistencyLevel.ONE);
System.out.println("size==========================================>" + lstRow.size());
for (KeySlice slice : lstRow) {
String rowKey = new String(slice.getKey(),"UTF-8");
List cols = slice.getColumns();
for (ColumnOrSuperColumn cs : cols) {
Column col = cs.getColumn();
System.out.println("read row " + rowKey + " " + new String(col.getName(), "UTF-8") + ":" + new String(col.getVal
4000
ue(), "UTF-8"));
}
}
}
}
}
} catch(UnsupportedEncodingException e){
e.printStackTrace();
} catch(TException e){
e.printStackTrace();
} catch(InvalidRequestException e){
e.printStackTrace();
} catch(TimedOutException e){
e.printStackTrace();
} catch(UnavailableException e){
e.printStackTrace();
} catch(NotFoundException e){
e.printStackTrace();
}
}
/**
* 指定しているrow keyにより、一行のcolumnを取得する
*
* @param client
*/
private static void getOneRowColumnSample(Client client){
try{
// Keyspace
String keyspace = "Ryu";
// Row Key
String key = "rowKey_1000000001";
// Column Family
String columnFamily = "T_ERPBASE_USER";

ByteBuffer keyBuffer = ByteBuffer.wrap(key.getBytes("UTF-8"));
ColumnParent parent = new ColumnParent(columnFamily).setSuper_column((ByteBuffer) null);

// 指定Keyspace
client.set_keyspace(keyspace);
// read entire row
SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
// 抽出したいcolumn範囲
// BIRTHDAY ~ NICK_NAME
sliceRange.setStart("BIRTHDAY".getBytes());
sliceRange.setFinish("NICK_NAME".getBytes());
// データ件数
//sliceRange.setCount(3);
// 順次優先で データ件数設定により、データを取得。true:降順;false:昇順
sliceRange.setReversed(false);
predicate.setSlice_range(sliceRange);

// 指定しているcolumn名
List lstCols = new ArrayList();
ByteBuffer column_names1 = ByteBuffer.wrap("BIRTHDAY".getBytes("UTF-8"));
lstCols.add(column_names1);
ByteBuffer column_names2 = ByteBuffer.wrap("NICK_NAME".getBytes("UTF-8"));
lstCols.add(column_names2);
// 抽出したいcolumn名リスト
//predicate.setColumn_names(lstCols);

// 一行データを取得
List results = client.get_slice(keyBuffer, parent, predicate, ConsistencyLevel.ONE);
for (ColumnOrSuperColumn result : results) {
Column columnShow = result.column;
System.out.println(new String(columnShow.getName(), "UTF8") + " -> " + new String(columnShow.getValue(), "UTF8"));
}
} catch(UnsupportedEncodingException e){
e.printStackTrace();
} catch(TException e){
e.printStackTrace();
} catch(InvalidRequestException e){
e.printStackTrace();
} catch(TimedOutException e){
e.printStackTrace();
} catch(UnavailableException e){
e.printStackTrace();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息