您的位置:首页 > 其它

从HBase读取数据提交到Solr建立索引

2013-12-09 19:58 344 查看
        从HBase中读取数据既可以直接调用HTable等api接口,也可以采用MapReduce的方式来读。如果数据表比较大,分成多个region来存储,后者可以显著提高数据读取效率。HBase提供了一个行统计程序RowCounter(org.apache.hadoop.hbase.mapreduce包)展示了如何通过MapReduce读取HBase示例,可作为参考。

        注意在读取时可以配置以下调优参数:

        conf.setBoolean("mapred.map.tasks.speculative.execution", false);

  当MapReduce数据源为HBase时,一般会设置Speculative Execution为false。因为HBase的本地存储机制,当前map处理的数据一般位于本机,若另外新建一个map,需要在网络中传输数据,浪费网络和I/O资源。

        conf.setInt("hbase.client.scanner.caching", 100);

  指定一次next()操作从服务器获取的数据条数,默认为1。适当提高该值可以显著提升性能,代价是内存占用的增加。

        scan.setCacheBlocks(false);

  Scan出来的数据一般是一次使用的,不必放在缓存中。

  在利用solrj添加索引时,可以每构建一个SolrInputDocument就执行add操作,它会触发http请求,将数据发送到solr服务器。为了提高效率,最好是批量add。在程序中用List缓存文档对象,数量达到参数“solr.commit.size”时,就批量add。注意在cleanup()中将List中余下的文档提交到服务器。程序中用到的参数写在了hbase-site.xml文件中。

public class HBaseIndexer {
public static final Logger LOG = LoggerFactory.getLogger(HBaseIndexer.class);

/** Mapper实现 */
static class SolrIndexerMapper extends TableMapper<Text, Text> {
public static enum Counters {ROWS}; // 用于计数器
private SolrServer solr; // 只创建一个SolrServer实例
private int commitSize;
private final List<SolrInputDocument> inputDocs = new ArrayList<SolrInputDocument>();

/** Called once at the beginning of the task. */
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
solr = new HttpSolrServer(conf.get("solr.server"));
commitSize = conf.getInt("solr.commit.size", 1000); // 一次性添加的文档数,写在配置文件中
}

@Override
public void map(ImmutableBytesWritable row, Result values, Context context)
throws IOException {
SolrInputDocument solrDoc = new SolrInputDocument();
for (KeyValue kv : values.list()) {
String fieldName = Bytes.toString(kv.getQualifier()); // 直接将列标识符名作为域名
String fieldValue = Bytes.toString(kv.getValue());
solrDoc.addField(fieldName, fieldValue);
}
inputDocs.add(solrDoc);

if (inputDocs.size() >= commitSize) {
try {
LOG.info("添加文档:Adding " + Integer.toString(inputDocs.size()) + " documents");
solr.add(inputDocs); // 索引文档
} catch (final SolrServerException e) {
final IOException ioe = new IOException();
ioe.initCause(e);
throw ioe;
}
inputDocs.clear();
}
context.getCounter(Counters.ROWS).increment(1);
}

/** Called once at the end of the task. */
protected void cleanup(Context context) throws IOException, InterruptedException {
try {
if (!inputDocs.isEmpty()) {
LOG.info("清空队列:Adding " + Integer.toString(inputDocs.size()) + " documents");
solr.add(inputDocs);
inputDocs.clear();
}
} catch (final SolrServerException e) {
final IOException ioe = new IOException();
ioe.initCause(e);
throw ioe;
}
}
}

public static Job createSubmittableJob(Configuration conf, String tablename)
throws IOException {
Job job = new Job(conf, "SolrIndex_" + tablename);
job.setJarByClass(HBaseIndexer.class);

Scan scan = new Scan();
scan.setCacheBlocks(false); // scan的数据不放在缓存中,一次性的

/* 需要建索引的数据 */
scan.addColumn(Bytes.toBytes("baseinfo"), Bytes.toBytes("title"));
scan.addColumn(Bytes.toBytes("baseinfo"), Bytes.toBytes("user"));
scan.addColumn(Bytes.toBytes("baseinfo"), Bytes.toBytes("classlabel"));
scan.addColumn(Bytes.toBytes("baseinfo"), Bytes.toBytes("summary"));

job.setOutputFormatClass(NullOutputFormat.class);
TableMapReduceUtil.initTableMapperJob(tablename, scan,
SolrIndexerMapper.class, null, null, job); // 不需要输出,键、值类型为null
job.setNumReduceTasks(0); // 无reduce任务
return job;
}

private static void printUsage(String errorMessage) {
System.err.println("ERROR: " + errorMessage);
System.err.println("Usage: VideoIndexer <tablename>");
}

/** Main entry point. */
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setInt("hbase.client.scanner.caching", 100);

if (args.length < 1) {
printUsage("Wrong number of parameters: " + args.length);
System.exit(-1);
}
String tablename = args[0];

Job job = createSubmittableJob(conf, tablename);
if (job == null) {
System.exit(-1);
}
job.waitForCompletion(true);
Counter counter = job.getCounters().findCounter(SolrIndexerMapper.Counters.ROWS);
LOG.info("Put " + counter.getValue() + " records to Solr!"); // 打印日志
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Solr Hbase