C# 通过Thrift 1 操作 HBase
2016-07-22 11:21
531 查看
什么是Thrift?
Thrift是一种RPC(远程过程调用)软件框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 这些编程语言间无缝结合的、高效的服务。C#通过Thrift访问HBase
HBase提供了Thrift服务端C#可执行程序可创建Thrift客户端连接到HBase。
准备
下载Thrift
编译器:Thrift compiler for Windows (thrift-0.9.3.exe)源代码及驱动库:thrift-0.9.3.tar.gz
下载HBase安装包
HBase-1.2.2-bin.tar.gz生成C#的Thrift客户端
1.创建一个文件夹用于存放所有的材料,本文示例:workspace
2.解压
thrift-0.9.3.tar.gz,复制
lib/csharp中找到
workspace。根据需要修改.Net framework 3.5到 4.0或以上,并编译得到
Thrift.dll。
3.创建测试项目
HBaseTest,并引用
Thrift.dll。
4.解压HBase的安装包,从安装包中复制
HBase.thrift文件到
workspace。
\hbase-1.2.2\hbase-thrift\src\main\resources\org\apache\hadoop\hbase\thrift\hbase.thrift
注意: HBase有两个版本的thrift,分别是
thrift和
thrift2。我们这里讲解的是的
thrift。
5.启用
cmd命令行模式,进入
workspace,执行以下命令生成代码:
thrift-0.9.3.exe -gen csharp hbase.thrift
执行后会生成
gen-csharp目录,里面包含了HBase的Thrift客户端代码。
6.将生成的代码加入到
HBaseTest项目中。
开发
AbstractHBaseThriftService 抽象服务
这里,我们实际上是对HBase Thrift客户端Java API实践中的Java代码进行了翻译,改写成C#语言的相关操作。我们在客户端,进行了一层抽象,更加便于传递各种参数,抽象类为AbstractHBaseThriftService,该类实现代码如下所示:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Thrift.Transport; using Thrift.Protocol; namespace HBaseTest { /// <summary> /// HBaseThrift客户端抽象服务 /// </summary> public abstract class AbstractHBaseThriftService { protected static readonly string CHARSET = "UTF-8"; private string host = "localhost"; private int port = 9090; private readonly TTransport transport; protected readonly Hbase.Client client; public AbstractHBaseThriftService() : this("localhost", 9090) { } public AbstractHBaseThriftService(string host, int port) { this.host = host; this.port = port; transport = new TSocket(host, port); TProtocol protocol = new TBinaryProtocol(transport, true, true); client = new Hbase.Client(protocol); } /// <summary> /// 打开通讯通道 /// </summary> public void Open() { if (transport != null) { transport.Open(); } } /// <summary> /// 关闭通讯通道 /// </summary> public void Close() { if (transport != null) { transport.Close(); } } /// <summary> /// 获取HBase数据所有用户表 /// </summary> /// <returns></returns> public abstract List<string> GetTables(); /// <summary> /// 更新数据 /// </summary> /// <param name="table"></param> /// <param name="rowKey"></param> /// <param name="writeToWal"></param> /// <param name="fieldName"></param> /// <param name="fieldValue"></param> /// <param name="attributes"></param> public abstract void Update( string table, string rowKey, bool writeToWal, string fieldName, string fieldValue, Dictionary<string, string> attributes); /// <summary> /// /// </summary> /// <param name="table"></param> /// <param name="rowKey"></param> /// <param name="writeToWal"></param> /// <param name="fieldNameValues"></param> /// <param name="attributes"></param> public abstract void Update( string table, string rowKey, bool writeToWal, Dictionary<string, string> fieldNameValues, Dictionary<string, string> attributes); /// <summary> /// 删除表中单元格 /// </summary> /// <param name="table">表名</param> /// <param name="rowKey">行健</param> /// <param name="writeToWal"></param> /// <param name="column">列族</param> /// <param name="attributes">属性</param> public abstract void DeleteCell( string table, string rowKey, bool writeToWal, string column, Dictionary<string, string> attributes); /// <summary> /// 删除表中指定单元格 /// </summary> /// <param name="table">表名</param> /// <param name="rowKey">行健</param> /// <param name="writeToWal"></param> /// <param name="columns">列族</param> /// <param name="attributes">属性</param> public abstract void DeleteCells( string table, string rowKey, bool writeToWal, List<string> columns, Dictionary<string, string> attributes); /// <summary> /// 删除行 /// </summary> /// <param name="table">表名</param> /// <param name="rowKey">行健</param> /// <param name="attributes">属性</param> public abstract void DeleteRow( string table, string rowKey, Dictionary<string, string> attributes); /// <summary> /// /// </summary> /// <param name="table"></param> /// <param name="startRow"></param> /// <param name="columns"></param> /// <param name="attributes"></param> /// <returns></returns> public abstract int ScannerOpen( string table, string startRow, List<string> columns, Dictionary<string, string> attributes); /// <summary> /// /// </summary> /// <param name="table"></param> /// <param name="startRow"></param> /// <param name="stopRow"></param> /// <param name="columns"></param> /// <param name="attributes"></param> /// <returns></returns> public abstract int ScannerOpen( string table, string startRow, string stopRow, List<string> columns, Dictionary<string, string> attributes); /// <summary> /// /// </summary> /// <param name="table"></param> /// <param name="startAndPrefix"></param> /// <param name="columns"></param> /// <param name="attributes"></param> /// <returns></returns> public abstract int ScannerOpenWithPrefix( string table, string startAndPrefix, List<string> columns, Dictionary<string, string> attributes); /// <summary> /// /// </summary> /// <param name="table"></param> /// <param name="startRow"></param> /// <param name="columns"></param> /// <param name="timestamp"></param> /// <param name="attributes"></param> /// <returns></returns> public abstract int ScannerOpenTs( string table, string startRow, List<string> columns, long timestamp, Dictionary<string, string> attributes); /// <summary> /// /// </summary> /// <param name="table"></param> /// <param name="startRow"></param> /// <param name="stopRow"></param> /// <param name="columns"></param> /// <param name="timestamp"></param> /// <param name="attributes"></param> /// <returns></returns> public abstract int ScannerOpenTs( string table, string startRow, string stopRow, List<string> columns, long timestamp, Dictionary<string, string> attributes); /// <summary> /// 扫描器获取行列表 /// </summary> /// <param name="id"></param> /// <param name="nbRows"></param> /// <returns></returns> public abstract List<TRowResult> ScannerGetList(int id, int nbRows); /// <summary> /// 扫描器获取行数据 /// </summary> /// <param name="id"></param> /// <returns></returns> public abstract List<TRowResult> ScannerGet(int id); /// <summary> /// 获取指定行 /// </summary> /// <param name="table"></param> /// <param name="row"></param> /// <param name="attributes"></param> /// <returns></returns> public abstract List<TRowResult> GetRow( string table, string row, Dictionary<string, string> attributes); /// <summary> /// 批量获取列族 /// </summary> /// <param name="table"></param> /// <param name="rows"></param> /// <param name="attributes"></param> /// <returns></returns> public abstract List<TRowResult> GetRows( string table, List<string> rows, Dictionary<string, string> attributes); /// <summary> /// 批量获取指定列族的行 /// </summary> /// <param name="table"></param> /// <param name="rows"></param> /// <param name="columns"></param> /// <param name="attributes"></param> /// <returns></returns> public abstract List<TRowResult> GetRowsWithColumns( string table, List<string> rows, List<string> columns, Dictionary<string, string> attributes); /// <summary> /// 关闭扫描器 /// </summary> /// <param name="id"></param> public abstract void ScannerClose(int id); /// <summary> /// 迭代结果 /// </summary> /// <param name="result"></param> public abstract void IterateResults(TRowResult result); } }
Thrift服务的基本功能
建立到Thrift服务的连接:
Open()
获取到HBase中的所有表名:
GetTables()
更新HBase表记录:
Update()
删除HBase表中一行的记录的数据(cell):
DeleteCell()和DeleCells()
删除HBase表中一行记录:
deleteRow()
打开一个Scanner,返回id:
ScannerOpen()、
ScannerOpenWithPrefix()和
ScannerOpenTs();然后用返回的id迭代记录:
ScannerGetList()和
ScannerGet()
获取一行记录结果:
GetRow()、
GetRows()和
GetRowsWithColumns()
关闭一个Scanner:
ScannerClose()
迭代结果,用于调试:
IterateResults()
HBaseThriftService 抽象服务
根据抽象HBaseThrift抽象服务定义,我们的一个实现如下:using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace HBaseTest { /// <summary> /// HBaseThrift服务 /// </summary> public class HBaseThriftService : AbstractHBaseThriftService { public HBaseThriftService() : this("localhost", 9090) { } public HBaseThriftService(string host, int port) : base(host, port) { } /// <inheriated-doc /> public override List<string> GetTables() { List<byte[]> tables = client.getTableNames(); List<String> list = new List<String>(); foreach (byte[] table in tables) { list.Add(Decode(table)); } return list; } /// <inheriated-doc /> public override void Update(string table, string rowKey, bool writeToWal, string fieldName, string fieldValue, Dictionary<string, string> attributes) { byte[] tableName = Encode(table); byte[] row = Encode(rowKey); Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes); List<Mutation> mutations = new List<Mutation>(); Mutation mutation = new Mutation(); mutation.IsDelete = false; mutation.WriteToWAL = writeToWal; mutation.Column = Encode(fieldName); mutation.Value = Encode(fieldValue); mutations.Add(mutation); client.mutateRow(tableName, row, mutations, encodedAttributes); } /// <inheriated-doc /> public override void Update(string table, string rowKey, bool writeToWal, Dictionary<string, string> fieldNameValues, Dictionary<string, string> attributes) { byte[] tableName = Encode(table); byte[] row = Encode(rowKey); Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes); List<Mutation> mutations = new List<Mutation>(); foreach (KeyValuePair<String, String> pair in fieldNameValues) { Mutation mutation = new Mutation(); mutation.IsDelete = false; mutation.WriteToWAL = writeToWal; mutation.Column = Encode(pair.Key); mutation.Value = Encode(pair.Value); mutations.Add(mutation); } client.mutateRow(tableName, row, mutations, encodedAttributes); } /// <inheriated-doc /> public override void DeleteCell(string table, string rowKey, bool writeToWal, string column, Dictionary<string, string> attributes) { byte[] tableName = Encode(table); byte[] row = Encode(rowKey); Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes); List<Mutation> mutations = new List<Mutation>(); Mutation mutation = new Mutation(); mutation.IsDelete = true; mutation.WriteToWAL = writeToWal; mutation.Column = Encode(column); mutations.Add(mutation); client.mutateRow(tableName, row, mutations, encodedAttributes); } /// <inheriated-doc /> public override void DeleteCells(string table, string rowKey, bool writeToWal, List<string> columns, Dictionary<string, string> attributes) { byte[] tableName = Encode(table); byte[] row = Encode(rowKey); Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes); List<Mutation> mutations = new List<Mutation>(); foreach (string column in columns) { Mutation mutation = new Mutation(); mutation.IsDelete = true; mutation.WriteToWAL = writeToWal; mutation.Column = Encode(column); mutations.Add(mutation); } client.mutateRow(tableName, row, mutations, encodedAttributes); } /// <inheriated-doc /> public override void DeleteRow(string table, string rowKey, Dictionary<string, string> attributes) { byte[] tableName = Encode(table); byte[] row = Encode(rowKey); Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes); client.deleteAllRow(tableName, row, encodedAttributes); } /// <inheriated-doc /> public override int ScannerOpen(string table, string startRow, List<string> columns, Dictionary<string, string> attributes) { byte[] tableName = Encode(table); byte[] start = Encode(startRow); List<byte[]> encodedColumns = EncodeStringList(columns); Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes); return client.scannerOpen(tableName, start, encodedColumns, encodedAttributes); } /// <inheriated-doc /> public override int ScannerOpen(string table, string startRow, string stopRow, List<string> columns, Dictionary<string, string> attributes) { byte[] tableName = Encode(table); byte[] start = Encode(startRow); byte[] stop = Encode(stopRow); List<byte[]> encodedColumns = EncodeStringList(columns); Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes); return client.scannerOpenWithStop(tableName, start, stop, encodedColumns, encodedAttributes); } /// <inheriated-doc /> public override int ScannerOpenWithPrefix(string table, string startAndPrefix, List<string> columns, Dictionary<string, string> attributes) { byte[] tableName = Encode(table); byte[] prefix = Encode(startAndPrefix); List<byte[]> encodedColumns = EncodeStringList(columns); Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes); return client.scannerOpenWithPrefix(tableName, prefix, encodedColumns, encodedAttributes); } /// <inheriated-doc /> public override int ScannerOpenTs(string table, string startRow, List<string> columns, long timestamp, Dictionary<string, string> attributes) { byte[] tableName = Encode(table); byte[] start = Encode(startRow); List<byte[]> encodedColumns = EncodeStringList(columns); Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes); return client.scannerOpenTs(tableName, start, encodedColumns, timestamp, encodedAttributes); } /// <inheriated-doc /> public override int ScannerOpenTs(string table, string startRow, string stopRow, List<string> columns, long timestamp, Dictionary<string, string> attributes) { byte[] tableName = Encode(table); byte[] start = Encode(startRow); byte[] stop = Encode(stopRow); List<byte[]> encodedColumns = EncodeStringList(columns); Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes); return client.scannerOpenWithStopTs(tableName, start, stop, encodedColumns, timestamp, encodedAttributes); } /// <inheriated-doc /> public override List<TRowResult> ScannerGetList(int id, int nbRows) { return client.scannerGetList(id, nbRows); } /// <inheriated-doc /> public override List<TRowResult> ScannerGet(int id) { return client.scannerGet(id); } /// <inheriated-doc /> public override List<TRowResult> GetRow(string table, string row, Dictionary<string, string> attributes) { byte[] tableName = Encode(table); byte[] startRow = Encode(row); Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes); return client.getRow(tableName, startRow, encodedAttributes); } /// <inheriated-doc /> public override List<TRowResult> GetRows(string table, List<string> rows, Dictionary<string, string> attributes) { byte[] tableName = Encode(table); List<byte[]> encodedRows = EncodeStringList(rows); Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes); return client.getRows(tableName, encodedRows, encodedAttributes); } /// <inheriated-doc /> public override List<TRowResult> GetRowsWithColumns(string table, List<string> rows, List<string> columns, Dictionary<string, string> attributes) { byte[] tableName = Encode(table); List<byte[]> encodedRows = EncodeStringList(rows); List<byte[]> encodedColumns = EncodeStringList(columns); Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes); return client.getRowsWithColumns(tableName, encodedRows, encodedColumns, encodedAttributes); } /// <inheriated-doc /> public override void ScannerClose(int id) { client.scannerClose(id); } /// <inheriated-doc /> public override void IterateResults(TRowResult result) { foreach (KeyValuePair<byte[], TCell> pair in result.Columns) { Console.WriteLine("\tCol=" + Decode(pair.Key) + ", Value=" + Decode(pair.Value.Value)); } } /// <inheriated-doc /> private String Decode(byte[] bs) { return UTF8Encoding.Default.GetString(bs); } /// <inheriated-doc /> private byte[] Encode(String str) { return UTF8Encoding.Default.GetBytes(str); } /// <inheriated-doc /> private Dictionary<byte[], byte[]> EncodeAttributes(Dictionary<String, String> attributes) { Dictionary<byte[], byte[]> encodedAttributes = new Dictionary<byte[], byte[]>(); foreach (KeyValuePair<String, String> pair in attributes) { encodedAttributes.Add(Encode(pair.Key), Encode(pair.Value)); } return encodedAttributes; } /// <inheriated-doc /> private List<byte[]> EncodeStringList(List<String> strings) { List<byte[]> list = new List<byte[]>(); if (strings != null) { foreach (String str in strings) { list.Add(Encode(str)); } } return list; } } }
HBaseThrift 测试用例
针对HBase的Thrift接口,我们做了测试用例如下:using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace HBaseTest { class Test { private readonly AbstractHBaseThriftService thriftService; public Test(String host, int port) { thriftService = new HBaseThriftService(host, port); thriftService.Open(); } public Test() : this("master", 9090) { } static String RandomlyBirthday() { Random r = new Random(); int year = 1900 + r.Next(100); int month = 1 + r.Next(12); int date = 1 + r.Next(30); return year + "-" + month.ToString().PadLeft(2, '0') + "-" + date.ToString().PadLeft(2, '0'); } static String RandomlyGender() { Random r = new Random(); int flag = r.Next(2); return flag == 0 ? "M" : "F"; } static String RandomlyUserType() { Random r = new Random(); int flag = 1 + r.Next(10); return flag.ToString(); } public void Close() { thriftService.Close(); } public void CaseForGetTables() { var tableNameList = thriftService.GetTables(); foreach (var existedTableName in tableNameList) { Console.WriteLine("Table:{0}", existedTableName); } } public void CaseForCreateTable( string tableName, IList<string> columnNameList) { thriftService.CreateTable(tableName, columnNameList); } public void CaseForUpdate() { bool writeToWal = false; Dictionary<String, String> attributes = new Dictionary<String, String>(0); string table = SetTable(); // put kv pairs System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch(); watch.Start(); for (long i = 0; i < 50000; i++) { string rowKey = i.ToString().PadLeft(4, '0'); Dictionary<String, String> fieldNameValues = new Dictionary<String, String>(); fieldNameValues.Add("info:name", RandomNameGen.getRandomName()); fieldNameValues.Add("info:birthday", RandomlyBirthday()); fieldNameValues.Add("info:user_type", RandomlyUserType()); fieldNameValues.Add("info:gender", RandomlyGender()); thriftService.Update(table, rowKey, writeToWal, fieldNameValues, attributes); if (i % 10000 == 0) { Console.WriteLine( "Insert [{0}] Rows Time Eclipsed:{1}", i, watch.ElapsedMilliseconds); } } watch.Stop(); Console.WriteLine( "Insert [{0}] Rows Time Eclipsed:{1}", 50000, watch.ElapsedMilliseconds); } public void CaseForDeleteCells() { bool writeToWal = false; Dictionary<String, String> attributes = new Dictionary<String, String>(0); String table = SetTable(); // put kv pairs for (long i = 5; i < 10; i++) { String rowKey = i.ToString().PadLeft(4, '0'); List<String> columns = new List<String>(0); columns.Add("info:birthday"); thriftService.DeleteCells(table, rowKey, writeToWal, columns, attributes); } } public void CaseForDeleteRow() { Dictionary<String, String> attributes = new Dictionary<String, String>(0); String table = SetTable(); // delete rows for (long i = 5; i < 10; i++) { String rowKey = i.ToString().PadLeft(4, '0'); thriftService.DeleteRow(table, rowKey, attributes); } } public void CaseForScan() { Dictionary<String, String> attributes = new Dictionary<String, String>(0); String table = SetTable(); String startRow = "0001"; String stopRow = "0015"; List<String> columns = new List<String>(0); columns.Add("info:name"); int id = thriftService.ScannerOpen(table, startRow, stopRow, columns, attributes); int nbRows = 2; List<TRowResult> results = thriftService.ScannerGetList(id, nbRows); while (results?.Count > 0) { foreach (TRowResult result in results) { thriftService.IterateResults(result); } results = thriftService.ScannerGetList(id, nbRows); } thriftService.ScannerClose(id); } public void CaseForGet() { Dictionary<String, String> attributes = new Dictionary<String, String>(0); String table = SetTable(); List<String> rows = new List<String>(0); rows.Add("0009"); rows.Add("0098"); rows.Add("0999"); List<String> columns = new List<String>(0); columns.Add("info:birthday"); columns.Add("info:gender"); List<TRowResult> results = thriftService.GetRowsWithColumns(table, rows, columns, attributes); foreach (TRowResult result in results) { thriftService.IterateResults(result); } } private string SetTable() { string table = "test-user"; return table; } } }
Program 控制台程序启动类
准备好测试用例后,控制台程序的入口处调用即可:using System; using System.Diagnostics; using System.IO; using System.Text; namespace HBaseTest { class Program { static void Main(string[] args) { Test test = new Test(); test.CaseForGetTables(); test.CaseForCreateTable("test-user",new[] { "info" }); test.CaseForGetTables(); test.CaseForUpdate(); test.CaseForGet(); test.CaseForScan(); test.CaseForDeleteCells(); test.CaseForDeleteRow(); Console.ReadLine(); } } }
附A: Thrift2
thrift和
thrift2有很大的差别,tthrift2做了简化与合成,并把DDL有关的内容去掉了。未来thrift将被抛弃,固在后续文章中介绍
thrift2的使用。
附B: HBase实现分页步骤
HBase实现分页的逻辑和传统的关系型数据库操作有些不同。其常用步骤如下:打开一个
Scanner实例(例如调用
ScannerOpen()),返回一个
id
使用该
id调用
ScannerGetList()方法(可以指定每次返回几条记录的变量
nbRows的值),返回一个记录列表。
反复调用该
ScannerGetList()方法,直到此次没有结果返回为止。
相关文章推荐
- c#调用COM组件
- Facebook's New Real-time Messaging System: HBase to Store 135+ Billion Messages a Month
- Hadoop生态上几个技术的关系与区别:hive、pig、hbase 关系与区别
- C#实现把指定数据写入串口
- C#动态创建button的方法
- C#中抽象方法与虚拟方法的区别
- c#中虚函数的相关使用方法
- C#实现给图片加水印的方法
- C#使用加边法计算行列式的值
- C#实现多线程的同步方法实例分析
- C#中尾递归的使用、优化及编译器优化
- C#中的delegate委托类型基本学习教程
- C#实现子窗体与父窗体通信方法实例总结
- C#通用邮件发送类分享
- 举例讲解C#中自动实现的属性
- C#中this的用法集锦
- C#数据结构之顺序表(SeqList)实例详解
- C#.NET获取拨号连接的宽带连接方法
- C#异步绑定数据实现方法
- C#实现AddRange为数组添加多个元素的方法