您的位置:首页 > 其它

利用Solr建立HBase的二级索引

2016-06-17 11:36 423 查看
利用Solr建立HBase的二级索引:

1、编写的协处理器代码为

package com;

import java.io.IOException;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.client.Delete;

import org.apache.hadoop.hbase.client.Durability;

import org.apache.hadoop.hbase.client.Get;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;

import org.apache.hadoop.hbase.coprocessor.ObserverContext;

import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;

import org.apache.hadoop.hbase.regionserver.wal.WALEdit;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.solr.client.solrj.SolrServerException;

import org.apache.solr.client.solrj.impl.CloudSolrServer;

import org.apache.solr.common.SolrInputDocument;

public class SolrCoprocessor extends BaseRegionObserver {

 @Override

 public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,

   Put put, WALEdit edit, Durability durability) throws IOException {

  // TODO Auto-generated method stub

  insertSolr(put);

  super.postPut(e, put, edit, durability);

 }

 public void insertSolr(Put put) {

  CloudSolrServer cloudSolrServer;

  final String zkHost = "IP:2181,IP:2181,IP:2181";

  final int zkConnectTimeout = 1;

  cloudSolrServer = new CloudSolrServer(zkHost);

  cloudSolrServer.setZkConnectTimeout(zkConnectTimeout);

  cloudSolrServer.connect();

  cloudSolrServer.setDefaultCollection("student");

  SolrInputDocument doc = new SolrInputDocument();

  doc.addField("id", Bytes.toString(put.getRow()));

  for (KeyValue c : put.getFamilyMap().get(Bytes.toBytes("info"))) {

   String key = Bytes.toString(c.getQualifier());

   String value = Bytes.toString(c.getValue());

   System.out.println("key = " + key);

   System.out.println("value = " + value);

   if (key.equalsIgnoreCase("name") || key.equalsIgnoreCase("address")) {

    Map<String, Object> oper = new HashMap<String, Object>();

    oper.put("set", value);

    doc.addField(key, oper);

   }

  }

  try {

   cloudSolrServer.add(doc);

   cloudSolrServer.commit(true, true, true);

   cloudSolrServer.shutdown();

  } catch (Exception e) {

   // TODO Auto-generated catch block

   System.out.println("11111");

   cloudSolrServer.shutdown();

   putData("temp_student", put);

   // e.printStackTrace();

  }

 }

 // 插入Solr出错,将数据放入备份表中

 public void putData(String tablename, Put put) {

  Configuration hb_conf = HBaseConfiguration.create();

  HTable table;

  String content = "";

  long time = System.currentTimeMillis();

  String row = time + "";

  String operate = "put";

  content = operate;

  String rowkey = Bytes.toString(put.getRow());

  content = content + "\001" + rowkey;

  content = content + "\001";

  for (KeyValue c : put.getFamilyMap().get(Bytes.toBytes("info"))) {

   String key = Bytes.toString(c.getQualifier());

   String value = Bytes.toString(c.getValue());

   System.out.println("key = " + key);

   System.out.println("value = " + value);

   if (key.equalsIgnoreCase("name") || key.equalsIgnoreCase("address")) {

    content = content + key + "\003" + value + "\002";

   }

  }

  content = content.substring(0, content.length() - 1);

  try {

   table = new HTable(hb_conf, tablename);

   Put newPut = new Put(Bytes.toBytes(row));

   newPut.add(Bytes.toBytes("info"), Bytes.toBytes("content"),

     Bytes.toBytes(content));

   table.put(newPut);

  } catch (IOException e) {

   // TODO Auto-generated catch block

   e.printStackTrace();

  }

 }

 @Override

 public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e,

   Delete delete, WALEdit edit, Durability durability)

   throws IOException {

  // TODO Auto-generated method stub

  deleteSolr(delete);

  super.postDelete(e, delete, edit, durability);

 }

 public void deleteSolr(Delete delete) {

  CloudSolrServer cloudSolrServer;

  final String zkHost = "IP:2181,IP:2181,IP:2181";

  final int zkConnectTimeout = 1;

  cloudSolrServer = new CloudSolrServer(zkHost);

  cloudSolrServer.setZkConnectTimeout(zkConnectTimeout);

  cloudSolrServer.connect();

  cloudSolrServer.setDefaultCollection("student");

  Configuration hb_conf = HBaseConfiguration.create();

  HTable table;

  try {

   table = new HTable(hb_conf, "student");

   Get get = new Get(delete.getRow());

   Result result = table.get(get);

   if (result.size() == 0) {

    try {

     cloudSolrServer.deleteById(Bytes.toString(delete.getRow()));

     cloudSolrServer.commit(true, true, true);

     cloudSolrServer.shutdown();

    } catch (Exception e) {

     // TODO Auto-generated catch block

     cloudSolrServer.shutdown();

     deleteData("temp_student", get);

    }

   } else {

    SolrInputDocument doc = new SolrInputDocument();

    doc.addField("id", Bytes.toString(delete.getRow()));

    List<Cell> list_cell = result.listCells();

    for (int i = 0; i < list_cell.size(); i++) {

     Cell cell = list_cell.get(i);

     String key = Bytes.toString(cell.getQualifier());

     String value = Bytes.toString(cell.getValue());

     System.out.println("key = " + key + " value = " + value);

     doc.addField(key, value);

    }

    try {

     cloudSolrServer.add(doc);

     cloudSolrServer.commit(true, true, true);

     cloudSolrServer.shutdown();

    } catch (Exception e) {

     // TODO Auto-generated catch block

     cloudSolrServer.shutdown();

     addData("temp_student", result);

    }

   }

  } catch (IOException e) {

   // TODO Auto-generated catch block

  }

 }

 // 删除Solr出错,将数据放入备份表中

 public void deleteData(String tablename, Get get) {

  Configuration hb_conf = HBaseConfiguration.create();

  HTable table;

  long time = System.currentTimeMillis();

  String row = time + "";

  String operate = "delete";

  String content = operate;

  String rowkey = Bytes.toString(get.getRow());

  content = content + "\001" + rowkey;

  try {

   table = new HTable(hb_conf, tablename);

   Put newPut = new Put(Bytes.toBytes(row));

   newPut.add(Bytes.toBytes("info"), Bytes.toBytes("content"),

     Bytes.toBytes(content));

   table.put(newPut);

  } catch (IOException e) {

   // TODO Auto-generated catch block

   e.printStackTrace();

  }

 }

 // 更新Solr出错,将数据放入备份表中

 public void addData(String tablename, Result result) {

  Configuration hb_conf = HBaseConfiguration.create();

  HTable table;

  long time = System.currentTimeMillis();

  String row = time + "";

  String operate = "add";

  String content = operate;

  String rowkey = Bytes.toString(result.getRow());

  content = content + "\001" + rowkey;

  content=content+"\001";

  try {

   List<Cell> list_cell = result.listCells();

   for (int i = 0; i < list_cell.size(); i++) {

    Cell cell = list_cell.get(i);

    String key = Bytes.toString(cell.getQualifier());

    String value = Bytes.toString(cell.getValue());

    System.out.println("key = " + key + " value = " + value);

    content = content + key + "\003" + value + "\002";

   }

   content = content.substring(0, content.length() - 1);

   table = new HTable(hb_conf, tablename);

   Put newPut = new Put(Bytes.toBytes(row));

   newPut.add(Bytes.toBytes("info"), Bytes.toBytes("content"),

     Bytes.toBytes(content));

   table.put(newPut);

  } catch (IOException e) {

   // TODO Auto-generated catch block

   e.printStackTrace();

  }

 }

}

 

2、将协处理器的代码打包成jar包(Test_1.jar),然后放到HDFS上,或者放到每一台Hbase主机的同一文件目录下(本例子为:/home/hadoop/Test_1.jar)

3、将Solr的相关jar包放到HBase的lib文件中

4、为HBase的相关表添加协处理器

(1)disable 'student'
(2)alter 'student','coprocessor'=>'hdfs://IP:9000/Test_1.jar|
com.SolrCoprocessor|1001|'

 或者:

 alter
'student','coprocessor'=>'file:///home/hadoop/Test_1.jar|

 com.SolrCoprocessor|1001|'

(3)enable 'student'
5、附带:为表删除协处理器的命令为:

alter
'student',METHOD=>'table_att_unset',NAME=>'coprocessor$1'
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Solr HBase