利用Solr建立HBase的二级索引
2016-06-17 11:36
423 查看
利用Solr建立HBase的二级索引:
1、编写的协处理器代码为
package com;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.common.SolrInputDocument;
public class SolrCoprocessor extends BaseRegionObserver {
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
Put put, WALEdit edit, Durability durability) throws IOException {
// TODO Auto-generated method stub
insertSolr(put);
super.postPut(e, put, edit, durability);
}
public void insertSolr(Put put) {
CloudSolrServer cloudSolrServer;
final String zkHost = "IP:2181,IP:2181,IP:2181";
final int zkConnectTimeout = 1;
cloudSolrServer = new CloudSolrServer(zkHost);
cloudSolrServer.setZkConnectTimeout(zkConnectTimeout);
cloudSolrServer.connect();
cloudSolrServer.setDefaultCollection("student");
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", Bytes.toString(put.getRow()));
for (KeyValue c : put.getFamilyMap().get(Bytes.toBytes("info"))) {
String key = Bytes.toString(c.getQualifier());
String value = Bytes.toString(c.getValue());
System.out.println("key = " + key);
System.out.println("value = " + value);
if (key.equalsIgnoreCase("name") || key.equalsIgnoreCase("address")) {
Map<String, Object> oper = new HashMap<String, Object>();
oper.put("set", value);
doc.addField(key, oper);
}
}
try {
cloudSolrServer.add(doc);
cloudSolrServer.commit(true, true, true);
cloudSolrServer.shutdown();
} catch (Exception e) {
// TODO Auto-generated catch block
System.out.println("11111");
cloudSolrServer.shutdown();
putData("temp_student", put);
// e.printStackTrace();
}
}
// 插入Solr出错,将数据放入备份表中
public void putData(String tablename, Put put) {
Configuration hb_conf = HBaseConfiguration.create();
HTable table;
String content = "";
long time = System.currentTimeMillis();
String row = time + "";
String operate = "put";
content = operate;
String rowkey = Bytes.toString(put.getRow());
content = content + "\001" + rowkey;
content = content + "\001";
for (KeyValue c : put.getFamilyMap().get(Bytes.toBytes("info"))) {
String key = Bytes.toString(c.getQualifier());
String value = Bytes.toString(c.getValue());
System.out.println("key = " + key);
System.out.println("value = " + value);
if (key.equalsIgnoreCase("name") || key.equalsIgnoreCase("address")) {
content = content + key + "\003" + value + "\002";
}
}
content = content.substring(0, content.length() - 1);
try {
table = new HTable(hb_conf, tablename);
Put newPut = new Put(Bytes.toBytes(row));
newPut.add(Bytes.toBytes("info"), Bytes.toBytes("content"),
Bytes.toBytes(content));
table.put(newPut);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e,
Delete delete, WALEdit edit, Durability durability)
throws IOException {
// TODO Auto-generated method stub
deleteSolr(delete);
super.postDelete(e, delete, edit, durability);
}
public void deleteSolr(Delete delete) {
CloudSolrServer cloudSolrServer;
final String zkHost = "IP:2181,IP:2181,IP:2181";
final int zkConnectTimeout = 1;
cloudSolrServer = new CloudSolrServer(zkHost);
cloudSolrServer.setZkConnectTimeout(zkConnectTimeout);
cloudSolrServer.connect();
cloudSolrServer.setDefaultCollection("student");
Configuration hb_conf = HBaseConfiguration.create();
HTable table;
try {
table = new HTable(hb_conf, "student");
Get get = new Get(delete.getRow());
Result result = table.get(get);
if (result.size() == 0) {
try {
cloudSolrServer.deleteById(Bytes.toString(delete.getRow()));
cloudSolrServer.commit(true, true, true);
cloudSolrServer.shutdown();
} catch (Exception e) {
// TODO Auto-generated catch block
cloudSolrServer.shutdown();
deleteData("temp_student", get);
}
} else {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", Bytes.toString(delete.getRow()));
List<Cell> list_cell = result.listCells();
for (int i = 0; i < list_cell.size(); i++) {
Cell cell = list_cell.get(i);
String key = Bytes.toString(cell.getQualifier());
String value = Bytes.toString(cell.getValue());
System.out.println("key = " + key + " value = " + value);
doc.addField(key, value);
}
try {
cloudSolrServer.add(doc);
cloudSolrServer.commit(true, true, true);
cloudSolrServer.shutdown();
} catch (Exception e) {
// TODO Auto-generated catch block
cloudSolrServer.shutdown();
addData("temp_student", result);
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
}
}
// 删除Solr出错,将数据放入备份表中
public void deleteData(String tablename, Get get) {
Configuration hb_conf = HBaseConfiguration.create();
HTable table;
long time = System.currentTimeMillis();
String row = time + "";
String operate = "delete";
String content = operate;
String rowkey = Bytes.toString(get.getRow());
content = content + "\001" + rowkey;
try {
table = new HTable(hb_conf, tablename);
Put newPut = new Put(Bytes.toBytes(row));
newPut.add(Bytes.toBytes("info"), Bytes.toBytes("content"),
Bytes.toBytes(content));
table.put(newPut);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// 更新Solr出错,将数据放入备份表中
public void addData(String tablename, Result result) {
Configuration hb_conf = HBaseConfiguration.create();
HTable table;
long time = System.currentTimeMillis();
String row = time + "";
String operate = "add";
String content = operate;
String rowkey = Bytes.toString(result.getRow());
content = content + "\001" + rowkey;
content=content+"\001";
try {
List<Cell> list_cell = result.listCells();
for (int i = 0; i < list_cell.size(); i++) {
Cell cell = list_cell.get(i);
String key = Bytes.toString(cell.getQualifier());
String value = Bytes.toString(cell.getValue());
System.out.println("key = " + key + " value = " + value);
content = content + key + "\003" + value + "\002";
}
content = content.substring(0, content.length() - 1);
table = new HTable(hb_conf, tablename);
Put newPut = new Put(Bytes.toBytes(row));
newPut.add(Bytes.toBytes("info"), Bytes.toBytes("content"),
Bytes.toBytes(content));
table.put(newPut);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2、将协处理器的代码打包成jar包(Test_1.jar),然后放到HDFS上,或者放到每一台Hbase主机的同一文件目录下(本例子为:/home/hadoop/Test_1.jar)
3、将Solr的相关jar包放到HBase的lib文件中
4、为HBase的相关表添加协处理器
(1)disable 'student'
(2)alter 'student','coprocessor'=>'hdfs://IP:9000/Test_1.jar|
com.SolrCoprocessor|1001|'
或者:
alter
'student','coprocessor'=>'file:///home/hadoop/Test_1.jar|
com.SolrCoprocessor|1001|'
(3)enable 'student'
5、附带:为表删除协处理器的命令为:
alter
'student',METHOD=>'table_att_unset',NAME=>'coprocessor$1'
1、编写的协处理器代码为
package com;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.common.SolrInputDocument;
public class SolrCoprocessor extends BaseRegionObserver {
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
Put put, WALEdit edit, Durability durability) throws IOException {
// TODO Auto-generated method stub
insertSolr(put);
super.postPut(e, put, edit, durability);
}
public void insertSolr(Put put) {
CloudSolrServer cloudSolrServer;
final String zkHost = "IP:2181,IP:2181,IP:2181";
final int zkConnectTimeout = 1;
cloudSolrServer = new CloudSolrServer(zkHost);
cloudSolrServer.setZkConnectTimeout(zkConnectTimeout);
cloudSolrServer.connect();
cloudSolrServer.setDefaultCollection("student");
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", Bytes.toString(put.getRow()));
for (KeyValue c : put.getFamilyMap().get(Bytes.toBytes("info"))) {
String key = Bytes.toString(c.getQualifier());
String value = Bytes.toString(c.getValue());
System.out.println("key = " + key);
System.out.println("value = " + value);
if (key.equalsIgnoreCase("name") || key.equalsIgnoreCase("address")) {
Map<String, Object> oper = new HashMap<String, Object>();
oper.put("set", value);
doc.addField(key, oper);
}
}
try {
cloudSolrServer.add(doc);
cloudSolrServer.commit(true, true, true);
cloudSolrServer.shutdown();
} catch (Exception e) {
// TODO Auto-generated catch block
System.out.println("11111");
cloudSolrServer.shutdown();
putData("temp_student", put);
// e.printStackTrace();
}
}
// 插入Solr出错,将数据放入备份表中
public void putData(String tablename, Put put) {
Configuration hb_conf = HBaseConfiguration.create();
HTable table;
String content = "";
long time = System.currentTimeMillis();
String row = time + "";
String operate = "put";
content = operate;
String rowkey = Bytes.toString(put.getRow());
content = content + "\001" + rowkey;
content = content + "\001";
for (KeyValue c : put.getFamilyMap().get(Bytes.toBytes("info"))) {
String key = Bytes.toString(c.getQualifier());
String value = Bytes.toString(c.getValue());
System.out.println("key = " + key);
System.out.println("value = " + value);
if (key.equalsIgnoreCase("name") || key.equalsIgnoreCase("address")) {
content = content + key + "\003" + value + "\002";
}
}
content = content.substring(0, content.length() - 1);
try {
table = new HTable(hb_conf, tablename);
Put newPut = new Put(Bytes.toBytes(row));
newPut.add(Bytes.toBytes("info"), Bytes.toBytes("content"),
Bytes.toBytes(content));
table.put(newPut);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e,
Delete delete, WALEdit edit, Durability durability)
throws IOException {
// TODO Auto-generated method stub
deleteSolr(delete);
super.postDelete(e, delete, edit, durability);
}
public void deleteSolr(Delete delete) {
CloudSolrServer cloudSolrServer;
final String zkHost = "IP:2181,IP:2181,IP:2181";
final int zkConnectTimeout = 1;
cloudSolrServer = new CloudSolrServer(zkHost);
cloudSolrServer.setZkConnectTimeout(zkConnectTimeout);
cloudSolrServer.connect();
cloudSolrServer.setDefaultCollection("student");
Configuration hb_conf = HBaseConfiguration.create();
HTable table;
try {
table = new HTable(hb_conf, "student");
Get get = new Get(delete.getRow());
Result result = table.get(get);
if (result.size() == 0) {
try {
cloudSolrServer.deleteById(Bytes.toString(delete.getRow()));
cloudSolrServer.commit(true, true, true);
cloudSolrServer.shutdown();
} catch (Exception e) {
// TODO Auto-generated catch block
cloudSolrServer.shutdown();
deleteData("temp_student", get);
}
} else {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", Bytes.toString(delete.getRow()));
List<Cell> list_cell = result.listCells();
for (int i = 0; i < list_cell.size(); i++) {
Cell cell = list_cell.get(i);
String key = Bytes.toString(cell.getQualifier());
String value = Bytes.toString(cell.getValue());
System.out.println("key = " + key + " value = " + value);
doc.addField(key, value);
}
try {
cloudSolrServer.add(doc);
cloudSolrServer.commit(true, true, true);
cloudSolrServer.shutdown();
} catch (Exception e) {
// TODO Auto-generated catch block
cloudSolrServer.shutdown();
addData("temp_student", result);
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
}
}
// 删除Solr出错,将数据放入备份表中
public void deleteData(String tablename, Get get) {
Configuration hb_conf = HBaseConfiguration.create();
HTable table;
long time = System.currentTimeMillis();
String row = time + "";
String operate = "delete";
String content = operate;
String rowkey = Bytes.toString(get.getRow());
content = content + "\001" + rowkey;
try {
table = new HTable(hb_conf, tablename);
Put newPut = new Put(Bytes.toBytes(row));
newPut.add(Bytes.toBytes("info"), Bytes.toBytes("content"),
Bytes.toBytes(content));
table.put(newPut);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// 更新Solr出错,将数据放入备份表中
public void addData(String tablename, Result result) {
Configuration hb_conf = HBaseConfiguration.create();
HTable table;
long time = System.currentTimeMillis();
String row = time + "";
String operate = "add";
String content = operate;
String rowkey = Bytes.toString(result.getRow());
content = content + "\001" + rowkey;
content=content+"\001";
try {
List<Cell> list_cell = result.listCells();
for (int i = 0; i < list_cell.size(); i++) {
Cell cell = list_cell.get(i);
String key = Bytes.toString(cell.getQualifier());
String value = Bytes.toString(cell.getValue());
System.out.println("key = " + key + " value = " + value);
content = content + key + "\003" + value + "\002";
}
content = content.substring(0, content.length() - 1);
table = new HTable(hb_conf, tablename);
Put newPut = new Put(Bytes.toBytes(row));
newPut.add(Bytes.toBytes("info"), Bytes.toBytes("content"),
Bytes.toBytes(content));
table.put(newPut);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2、将协处理器的代码打包成jar包(Test_1.jar),然后放到HDFS上,或者放到每一台Hbase主机的同一文件目录下(本例子为:/home/hadoop/Test_1.jar)
3、将Solr的相关jar包放到HBase的lib文件中
4、为HBase的相关表添加协处理器
(1)disable 'student'
(2)alter 'student','coprocessor'=>'hdfs://IP:9000/Test_1.jar|
com.SolrCoprocessor|1001|'
或者:
alter
'student','coprocessor'=>'file:///home/hadoop/Test_1.jar|
com.SolrCoprocessor|1001|'
(3)enable 'student'
5、附带:为表删除协处理器的命令为:
alter
'student',METHOD=>'table_att_unset',NAME=>'coprocessor$1'
相关文章推荐
- Facebook's New Real-time Messaging System: HBase to Store 135+ Billion Messages a Month
- Hadoop生态上几个技术的关系与区别:hive、pig、hbase 关系与区别
- 基于HBase Thrift接口的一些使用问题及相关注意事项的详解
- 在Eclipse中运行Solr 基础知识
- 如何解决struts2日期类型转换
- Eclipse中查看android工程代码出现"android.jar has no source attachment"的解决方案
- 基于Java实现杨辉三角 LeetCode Pascal's Triangle
- hbase shell基础和常用命令详解
- Solr 5.3.0集成mmseg4j、tomcat部署、Solrj 5.3.0使用
- 手把手教你配置Hbase完全分布式环境
- 实战:在Java Web 项目中使用HBase
- HBase RowKey设计的那些事
- Spark中将对象序列化存储到hdfs
- HBase基本原理
- Solr基础--设置solr/home的三种方式
- windows下安装solr5.5.0
- Docker使用supervisor构建solr
- solr4.0安装和简单导入mysql数据
- HBase中的基本概念
- 【原创】基于分布式存储的开源系统在实时数据库海量历史数据存储项目上的预研