您的位置:首页 > 编程语言 > C#

C# 通过Thrift 1 操作 HBase

2016-07-22 11:21 531 查看

什么是Thrift?

Thrift是一种RPC(远程过程调用)软件框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 这些编程语言间无缝结合的、高效的服务。

C#通过Thrift访问HBase

HBase提供了Thrift服务端

C#可执行程序可创建Thrift客户端连接到HBase。



准备

下载Thrift

编译器:Thrift compiler for Windows (thrift-0.9.3.exe)

源代码及驱动库:thrift-0.9.3.tar.gz

下载HBase安装包

HBase-1.2.2-bin.tar.gz

生成C#的Thrift客户端

1.创建一个文件夹用于存放所有的材料,本文示例:
workspace


2.解压
thrift-0.9.3.tar.gz
,复制
lib/csharp
中找到
workspace
。根据需要修改.Net framework 3.5到 4.0或以上,并编译得到
Thrift.dll




3.创建测试项目
HBaseTest
,并引用
Thrift.dll


4.解压HBase的安装包,从安装包中复制
HBase.thrift
文件到
workspace


\hbase-1.2.2\hbase-thrift\src\main\resources\org\apache\hadoop\hbase\thrift\hbase.thrift


注意: HBase有两个版本的thrift,分别是
thrift
thrift2
。我们这里讲解的是的
thrift


5.启用
cmd
命令行模式,进入
workspace
,执行以下命令生成代码:

thrift-0.9.3.exe -gen csharp hbase.thrift


执行后会生成
gen-csharp
目录,里面包含了HBase的Thrift客户端代码。



6.将生成的代码加入到
HBaseTest
项目中。



开发

AbstractHBaseThriftService 抽象服务

这里,我们实际上是对HBase Thrift客户端Java API实践中的Java代码进行了翻译,改写成C#语言的相关操作。我们在客户端,进行了一层抽象,更加便于传递各种参数,抽象类为
AbstractHBaseThriftService
,该类实现代码如下所示:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

using Thrift.Transport;
using Thrift.Protocol;

namespace HBaseTest
{
/// <summary>
/// HBaseThrift客户端抽象服务
/// </summary>
public abstract class AbstractHBaseThriftService
{
protected static readonly string CHARSET = "UTF-8";
private string host = "localhost";
private int port = 9090;
private readonly TTransport transport;
protected readonly Hbase.Client client;

public AbstractHBaseThriftService() :
this("localhost", 9090)
{

}

public AbstractHBaseThriftService(string host, int port)
{
this.host = host;
this.port = port;
transport = new TSocket(host, port);
TProtocol protocol = new TBinaryProtocol(transport, true, true);
client = new Hbase.Client(protocol);
}

/// <summary>
/// 打开通讯通道
/// </summary>
public void Open()
{
if (transport != null)
{
transport.Open();
}
}

/// <summary>
/// 关闭通讯通道
/// </summary>
public void Close()
{
if (transport != null)
{
transport.Close();
}
}

/// <summary>
/// 获取HBase数据所有用户表
/// </summary>
/// <returns></returns>
public abstract List<string> GetTables();

/// <summary>
/// 更新数据
/// </summary>
/// <param name="table"></param>
/// <param name="rowKey"></param>
/// <param name="writeToWal"></param>
/// <param name="fieldName"></param>
/// <param name="fieldValue"></param>
/// <param name="attributes"></param>
public abstract void Update(
string table,
string rowKey,
bool writeToWal,
string fieldName,
string fieldValue,
Dictionary<string, string> attributes);

/// <summary>
///
/// </summary>
/// <param name="table"></param>
/// <param name="rowKey"></param>
/// <param name="writeToWal"></param>
/// <param name="fieldNameValues"></param>
/// <param name="attributes"></param>
public abstract void Update(
string table,
string rowKey,
bool writeToWal,
Dictionary<string, string> fieldNameValues,
Dictionary<string, string> attributes);

/// <summary>
/// 删除表中单元格
/// </summary>
/// <param name="table">表名</param>
/// <param name="rowKey">行健</param>
/// <param name="writeToWal"></param>
/// <param name="column">列族</param>
/// <param name="attributes">属性</param>
public abstract void DeleteCell(
string table,
string rowKey,
bool writeToWal,
string column,
Dictionary<string, string> attributes);

/// <summary>
/// 删除表中指定单元格
/// </summary>
/// <param name="table">表名</param>
/// <param name="rowKey">行健</param>
/// <param name="writeToWal"></param>
/// <param name="columns">列族</param>
/// <param name="attributes">属性</param>
public abstract void DeleteCells(
string table,
string rowKey,
bool writeToWal,
List<string> columns,
Dictionary<string, string> attributes);

/// <summary>
/// 删除行
/// </summary>
/// <param name="table">表名</param>
/// <param name="rowKey">行健</param>
/// <param name="attributes">属性</param>
public abstract void DeleteRow(
string table,
string rowKey,
Dictionary<string, string> attributes);

/// <summary>
///
/// </summary>
/// <param name="table"></param>
/// <param name="startRow"></param>
/// <param name="columns"></param>
/// <param name="attributes"></param>
/// <returns></returns>
public abstract int ScannerOpen(
string table,
string startRow,
List<string> columns,
Dictionary<string, string> attributes);

/// <summary>
///
/// </summary>
/// <param name="table"></param>
/// <param name="startRow"></param>
/// <param name="stopRow"></param>
/// <param name="columns"></param>
/// <param name="attributes"></param>
/// <returns></returns>
public abstract int ScannerOpen(
string table,
string startRow,
string stopRow,
List<string> columns,
Dictionary<string, string> attributes);

/// <summary>
///
/// </summary>
/// <param name="table"></param>
/// <param name="startAndPrefix"></param>
/// <param name="columns"></param>
/// <param name="attributes"></param>
/// <returns></returns>
public abstract int ScannerOpenWithPrefix(
string table,
string startAndPrefix,
List<string> columns,
Dictionary<string, string> attributes);

/// <summary>
///
/// </summary>
/// <param name="table"></param>
/// <param name="startRow"></param>
/// <param name="columns"></param>
/// <param name="timestamp"></param>
/// <param name="attributes"></param>
/// <returns></returns>
public abstract int ScannerOpenTs(
string table,
string startRow,
List<string> columns,
long timestamp,
Dictionary<string, string> attributes);

/// <summary>
///
/// </summary>
/// <param name="table"></param>
/// <param name="startRow"></param>
/// <param name="stopRow"></param>
/// <param name="columns"></param>
/// <param name="timestamp"></param>
/// <param name="attributes"></param>
/// <returns></returns>
public abstract int ScannerOpenTs(
string table,
string startRow,
string stopRow,
List<string> columns,
long timestamp,
Dictionary<string, string> attributes);

/// <summary>
/// 扫描器获取行列表
/// </summary>
/// <param name="id"></param>
/// <param name="nbRows"></param>
/// <returns></returns>
public abstract List<TRowResult> ScannerGetList(int id, int nbRows);

/// <summary>
/// 扫描器获取行数据
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
public abstract List<TRowResult> ScannerGet(int id);

/// <summary>
/// 获取指定行
/// </summary>
/// <param name="table"></param>
/// <param name="row"></param>
/// <param name="attributes"></param>
/// <returns></returns>
public abstract List<TRowResult> GetRow(
string table,
string row,
Dictionary<string, string> attributes);

/// <summary>
/// 批量获取列族
/// </summary>
/// <param name="table"></param>
/// <param name="rows"></param>
/// <param name="attributes"></param>
/// <returns></returns>
public abstract List<TRowResult> GetRows(
string table,
List<string> rows,
Dictionary<string, string> attributes);

/// <summary>
/// 批量获取指定列族的行
/// </summary>
/// <param name="table"></param>
/// <param name="rows"></param>
/// <param name="columns"></param>
/// <param name="attributes"></param>
/// <returns></returns>
public abstract List<TRowResult> GetRowsWithColumns(
string table,
List<string> rows,
List<string> columns,
Dictionary<string, string> attributes);

/// <summary>
/// 关闭扫描器
/// </summary>
/// <param name="id"></param>
public abstract void ScannerClose(int id);

/// <summary>
/// 迭代结果
/// </summary>
/// <param name="result"></param>
public abstract void IterateResults(TRowResult result);

}
}


Thrift服务的基本功能

建立到
Thrift
服务的连接:
Open()


获取到HBase中的所有表名:
GetTables()


更新HBase表记录:
Update()


删除HBase表中一行的记录的数据(cell):
DeleteCell()和DeleCells()


删除HBase表中一行记录:
deleteRow()


打开一个Scanner,返回id:
ScannerOpen()
ScannerOpenWithPrefix()
ScannerOpenTs()
;然后用返回的id迭代记录:
ScannerGetList()
ScannerGet()


获取一行记录结果:
GetRow()
GetRows()
GetRowsWithColumns()


关闭一个Scanner:
ScannerClose()


迭代结果,用于调试:
IterateResults()


HBaseThriftService 抽象服务

根据抽象HBaseThrift抽象服务定义,我们的一个实现如下:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace HBaseTest
{
/// <summary>
/// HBaseThrift服务
/// </summary>
public class HBaseThriftService : AbstractHBaseThriftService
{
public HBaseThriftService()
: this("localhost", 9090)
{

}

public HBaseThriftService(string host, int port)
: base(host, port)
{

}

/// <inheriated-doc />
public override List<string> GetTables()
{
List<byte[]> tables = client.getTableNames();
List<String> list = new List<String>();
foreach (byte[] table in tables)
{
list.Add(Decode(table));
}
return list;
}

/// <inheriated-doc />
public override void Update(string table, string rowKey, bool writeToWal, string fieldName, string fieldValue, Dictionary<string, string> attributes)
{
byte[] tableName = Encode(table);
byte[] row = Encode(rowKey);
Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
List<Mutation> mutations = new List<Mutation>();
Mutation mutation = new Mutation();
mutation.IsDelete = false;
mutation.WriteToWAL = writeToWal;
mutation.Column = Encode(fieldName);
mutation.Value = Encode(fieldValue);
mutations.Add(mutation);
client.mutateRow(tableName, row, mutations, encodedAttributes);
}

/// <inheriated-doc />
public override void Update(string table, string rowKey, bool writeToWal, Dictionary<string, string> fieldNameValues, Dictionary<string, string> attributes)
{
byte[] tableName = Encode(table);
byte[] row = Encode(rowKey);
Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
List<Mutation> mutations = new List<Mutation>();
foreach (KeyValuePair<String, String> pair in fieldNameValues)
{
Mutation mutation = new Mutation();
mutation.IsDelete = false;
mutation.WriteToWAL = writeToWal;
mutation.Column = Encode(pair.Key);
mutation.Value = Encode(pair.Value);
mutations.Add(mutation);
}
client.mutateRow(tableName, row, mutations, encodedAttributes);
}

/// <inheriated-doc />
public override void DeleteCell(string table, string rowKey, bool writeToWal, string column, Dictionary<string, string> attributes)
{
byte[] tableName = Encode(table);
byte[] row = Encode(rowKey);
Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
List<Mutation> mutations = new List<Mutation>();
Mutation mutation = new Mutation();
mutation.IsDelete = true;
mutation.WriteToWAL = writeToWal;
mutation.Column = Encode(column);
mutations.Add(mutation);
client.mutateRow(tableName, row, mutations, encodedAttributes);
}

/// <inheriated-doc />
public override void DeleteCells(string table, string rowKey, bool writeToWal, List<string> columns, Dictionary<string, string> attributes)
{
byte[] tableName = Encode(table);
byte[] row = Encode(rowKey);
Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
List<Mutation> mutations = new List<Mutation>();
foreach (string column in columns)
{
Mutation mutation = new Mutation();
mutation.IsDelete = true;
mutation.WriteToWAL = writeToWal;
mutation.Column = Encode(column);
mutations.Add(mutation);
}
client.mutateRow(tableName, row, mutations, encodedAttributes);
}

/// <inheriated-doc />
public override void DeleteRow(string table, string rowKey, Dictionary<string, string> attributes)
{
byte[] tableName = Encode(table);
byte[] row = Encode(rowKey);
Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
client.deleteAllRow(tableName, row, encodedAttributes);
}

/// <inheriated-doc />
public override int ScannerOpen(string table, string startRow, List<string> columns, Dictionary<string, string> attributes)
{
byte[] tableName = Encode(table);
byte[] start = Encode(startRow);
List<byte[]> encodedColumns = EncodeStringList(columns);
Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
return client.scannerOpen(tableName, start, encodedColumns, encodedAttributes);
}

/// <inheriated-doc />
public override int ScannerOpen(string table, string startRow, string stopRow, List<string> columns, Dictionary<string, string> attributes)
{
byte[] tableName = Encode(table);
byte[] start = Encode(startRow);
byte[] stop = Encode(stopRow);
List<byte[]> encodedColumns = EncodeStringList(columns);
Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
return client.scannerOpenWithStop(tableName, start, stop, encodedColumns, encodedAttributes);
}

/// <inheriated-doc />
public override int ScannerOpenWithPrefix(string table, string startAndPrefix, List<string> columns, Dictionary<string, string> attributes)
{
byte[] tableName = Encode(table);
byte[] prefix = Encode(startAndPrefix);
List<byte[]> encodedColumns = EncodeStringList(columns);
Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
return client.scannerOpenWithPrefix(tableName, prefix, encodedColumns, encodedAttributes);
}

/// <inheriated-doc />
public override int ScannerOpenTs(string table, string startRow, List<string> columns, long timestamp, Dictionary<string, string> attributes)
{
byte[] tableName = Encode(table);
byte[] start = Encode(startRow);
List<byte[]> encodedColumns = EncodeStringList(columns);
Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
return client.scannerOpenTs(tableName, start, encodedColumns, timestamp, encodedAttributes);
}

/// <inheriated-doc />
public override int ScannerOpenTs(string table, string startRow, string stopRow, List<string> columns, long timestamp, Dictionary<string, string> attributes)
{
byte[] tableName = Encode(table);
byte[] start = Encode(startRow);
byte[] stop = Encode(stopRow);
List<byte[]> encodedColumns = EncodeStringList(columns);
Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
return client.scannerOpenWithStopTs(tableName, start, stop, encodedColumns, timestamp, encodedAttributes);
}

/// <inheriated-doc />
public override List<TRowResult> ScannerGetList(int id, int nbRows)
{
return client.scannerGetList(id, nbRows);
}

/// <inheriated-doc />
public override List<TRowResult> ScannerGet(int id)
{
return client.scannerGet(id);
}

/// <inheriated-doc />
public override List<TRowResult> GetRow(string table, string row, Dictionary<string, string> attributes)
{
byte[] tableName = Encode(table);
byte[] startRow = Encode(row);
Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
return client.getRow(tableName, startRow, encodedAttributes);
}

/// <inheriated-doc />
public override List<TRowResult> GetRows(string table, List<string> rows, Dictionary<string, string> attributes)
{
byte[] tableName = Encode(table);
List<byte[]> encodedRows = EncodeStringList(rows);
Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
return client.getRows(tableName, encodedRows, encodedAttributes);
}

/// <inheriated-doc />
public override List<TRowResult> GetRowsWithColumns(string table, List<string> rows, List<string> columns, Dictionary<string, string> attributes)
{
byte[] tableName = Encode(table);
List<byte[]> encodedRows = EncodeStringList(rows);
List<byte[]> encodedColumns = EncodeStringList(columns);
Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
return client.getRowsWithColumns(tableName, encodedRows, encodedColumns, encodedAttributes);
}

/// <inheriated-doc />
public override void ScannerClose(int id)
{
client.scannerClose(id);
}

/// <inheriated-doc />
public override void IterateResults(TRowResult result)
{
foreach (KeyValuePair<byte[], TCell> pair in result.Columns)
{
Console.WriteLine("\tCol=" + Decode(pair.Key) + ", Value=" + Decode(pair.Value.Value));
}
}

/// <inheriated-doc />
private String Decode(byte[] bs)
{
return UTF8Encoding.Default.GetString(bs);
}

/// <inheriated-doc />
private byte[] Encode(String str)
{
return UTF8Encoding.Default.GetBytes(str);
}

/// <inheriated-doc />
private Dictionary<byte[], byte[]> EncodeAttributes(Dictionary<String, String> attributes)
{
Dictionary<byte[], byte[]> encodedAttributes = new Dictionary<byte[], byte[]>();
foreach (KeyValuePair<String, String> pair in attributes)
{
encodedAttributes.Add(Encode(pair.Key), Encode(pair.Value));
}
return encodedAttributes;
}

/// <inheriated-doc />
private List<byte[]> EncodeStringList(List<String> strings)
{
List<byte[]> list = new List<byte[]>();
if (strings != null)
{
foreach (String str in strings)
{
list.Add(Encode(str));
}
}
return list;
}
}
}


HBaseThrift 测试用例

针对HBase的Thrift接口,我们做了测试用例如下:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace HBaseTest
{
class Test
{
private readonly AbstractHBaseThriftService thriftService;

public Test(String host, int port)
{
thriftService = new HBaseThriftService(host, port);
thriftService.Open();
}

public Test() : this("master", 9090)
{

}

static String RandomlyBirthday()
{
Random r = new Random();
int year = 1900 + r.Next(100);
int month = 1 + r.Next(12);
int date = 1 + r.Next(30);
return year + "-" + month.ToString().PadLeft(2, '0') + "-" + date.ToString().PadLeft(2, '0');
}

static String RandomlyGender()
{
Random r = new Random();
int flag = r.Next(2);
return flag == 0 ? "M" : "F";
}

static String RandomlyUserType()
{
Random r = new Random();
int flag = 1 + r.Next(10);
return flag.ToString();
}

public void Close()
{
thriftService.Close();
}

public void CaseForGetTables()
{
var tableNameList = thriftService.GetTables();
foreach (var existedTableName in tableNameList)
{
Console.WriteLine("Table:{0}", existedTableName);
}
}

public void CaseForCreateTable(
string tableName,
IList<string> columnNameList)
{
thriftService.CreateTable(tableName, columnNameList);
}

public void CaseForUpdate()
{
bool writeToWal = false;
Dictionary<String, String> attributes = new Dictionary<String, String>(0);
string table = SetTable();
// put kv pairs

System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch();
watch.Start();
for (long i = 0; i < 50000; i++)
{
string rowKey = i.ToString().PadLeft(4, '0');
Dictionary<String, String> fieldNameValues = new Dictionary<String, String>();
fieldNameValues.Add("info:name", RandomNameGen.getRandomName());
fieldNameValues.Add("info:birthday", RandomlyBirthday());
fieldNameValues.Add("info:user_type", RandomlyUserType());
fieldNameValues.Add("info:gender", RandomlyGender());
thriftService.Update(table, rowKey, writeToWal, fieldNameValues, attributes);

if (i % 10000 == 0)
{
Console.WriteLine(
"Insert [{0}] Rows Time Eclipsed:{1}",
i,
watch.ElapsedMilliseconds);
}
}
watch.Stop();

Console.WriteLine(
"Insert [{0}] Rows Time Eclipsed:{1}",
50000,
watch.ElapsedMilliseconds);
}

public void CaseForDeleteCells()
{
bool writeToWal = false;
Dictionary<String, String> attributes = new Dictionary<String, String>(0);
String table = SetTable();
// put kv pairs
for (long i = 5; i < 10; i++)
{
String rowKey = i.ToString().PadLeft(4, '0');
List<String> columns = new List<String>(0);
columns.Add("info:birthday");
thriftService.DeleteCells(table, rowKey, writeToWal, columns, attributes);
}
}

public void CaseForDeleteRow()
{
Dictionary<String, String> attributes = new Dictionary<String, String>(0);
String table = SetTable();
// delete rows
for (long i = 5; i < 10; i++)
{
String rowKey = i.ToString().PadLeft(4, '0');
thriftService.DeleteRow(table, rowKey, attributes);
}
}

public void CaseForScan()
{
Dictionary<String, String> attributes = new Dictionary<String, String>(0);
String table = SetTable();
String startRow = "0001";
String stopRow = "0015";
List<String> columns = new List<String>(0);
columns.Add("info:name");
int id = thriftService.ScannerOpen(table, startRow, stopRow, columns, attributes);
int nbRows = 2;
List<TRowResult> results = thriftService.ScannerGetList(id, nbRows);
while (results?.Count > 0)
{
foreach (TRowResult result in results)
{
thriftService.IterateResults(result);
}
results = thriftService.ScannerGetList(id, nbRows);
}
thriftService.ScannerClose(id);
}

public void CaseForGet()
{
Dictionary<String, String> attributes = new Dictionary<String, String>(0);
String table = SetTable();
List<String> rows = new List<String>(0);
rows.Add("0009");
rows.Add("0098");
rows.Add("0999");
List<String> columns = new List<String>(0);
columns.Add("info:birthday");
columns.Add("info:gender");
List<TRowResult> results = thriftService.GetRowsWithColumns(table, rows, columns, attributes);
foreach (TRowResult result in results)
{
thriftService.IterateResults(result);
}
}

private string SetTable()
{
string table = "test-user";
return table;
}
}
}


Program 控制台程序启动类

准备好测试用例后,控制台程序的入口处调用即可:

using System;
using System.Diagnostics;
using System.IO;
using System.Text;

namespace HBaseTest
{
class Program
{
static void Main(string[] args)
{
Test test = new Test();
test.CaseForGetTables();
test.CaseForCreateTable("test-user",new[] { "info" });
test.CaseForGetTables();
test.CaseForUpdate();
test.CaseForGet();
test.CaseForScan();
test.CaseForDeleteCells();
test.CaseForDeleteRow();
Console.ReadLine();
}
}
}


附A: Thrift2

thrift
thrift2
有很大的差别,tthrift2做了简化与合成,并把DDL有关的内容去掉了。未来thrift将被抛弃,固在后续文章中介绍
thrift2
的使用。

附B: HBase实现分页步骤

HBase实现分页的逻辑和传统的关系型数据库操作有些不同。其常用步骤如下:

打开一个
Scanner
实例(例如调用
ScannerOpen()
),返回一个
id


使用该
id
调用
ScannerGetList()
方法(可以指定每次返回几条记录的变量
nbRows
的值),返回一个记录列表。

反复调用该
ScannerGetList()
方法,直到此次没有结果返回为止。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hbase c# thrift