您的位置:首页 > 其它

DataX学习笔记-Reader插件开发

2016-06-24 15:28 2001 查看
DataX开发基于读取ElasticSearch数据的Reader插件

1、检出DataX源码(git
clone https://github.com/alibaba/DataX.git DataX),导入项目,新建一个esreader的maven项目进行插件开发。

2、在DataX安装目录的plugins/reader目录下新建esreader目录,目录下包含plugin_job_template.json、plugin.json、esreader-0.0.1-SNAPSHOT.jar,同时在目录下创建一个libs目录,存放相关依赖的jar文件。

相关代码:

package com.alibaba.datax.plugin.reader.esreader;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.google.gson.Gson;
import com.umeng.es.config.EsServerAddress;

public class ESReader extends Reader {

public static class Job extends Reader.Job {

private Configuration originalConfiguration = null;

@Override
public void preCheck() {
super.preCheck();
}

@Override
public void preHandler(Configuration jobConfiguration) {
super.preHandler(jobConfiguration);
}

@Override
public void init() {
this.originalConfiguration = super.getPluginJobConf();
}

@Override
public void prepare() {
super.prepare();
}

@Override
public void post() {
super.post();
}

@Override
public void postHandler(Configuration jobConfiguration) {
super.postHandler(jobConfiguration);
}

@Override
public void destroy() {
}

@Override
public List<Configuration> split(int adviceNumber) {
List<Configuration> readerSplitConfiguration = new ArrayList<Configuration>();
for (int i = 0; i < adviceNumber; i++) {
readerSplitConfiguration.add(this.originalConfiguration);
}
return readerSplitConfiguration;
}

}

public static class Task extends Reader.Task {

private Configuration readerSliceConfiguration = null;

private String esClusterName = null;

private String esClusterIP = null;

private Integer esClusterPort = null;

private String esIndex = null;

private String esType = null;

private Gson gson = null;

private TransportClient client = null;

private Integer batchSize = null;

private static final Logger LOG = LoggerFactory.getLogger(Task.class);

@Override
public void preCheck() {
super.preCheck();
}

@Override
public void preHandler(Configuration jobConfiguration) {
super.preHandler(jobConfiguration);
}

@Override
public void init() {
this.readerSliceConfiguration = super.getPluginJobConf();
this.esClusterName = readerSliceConfiguration.getString(Key.esClusterName);
this.esClusterIP = readerSliceConfiguration.getString(Key.esClusterIP);
this.esClusterPort = readerSliceConfiguration.getInt(Key.esClusterPort, 9300);
this.esIndex = readerSliceConfiguration.getString(Key.esIndex);
this.esType = readerSliceConfiguration.getString(Key.esType);
this.batchSize = readerSliceConfiguration.getInt(Key.batchSize, 1000);
this.gson = new Gson();
}

@Override
public void prepare() {
super.prepare();
Settings settings = Settings.builder().put("cluster.name", esClusterName)
.put("client.tansport.sniff", true).build();
client = TransportClient.builder().settings(settings).build();
List<EsServerAddress> serverAddress = new ArrayList<EsServerAddress>();
String[] esClusterIPs = esClusterIP.contains(",") ?
esClusterIP.split(",") : new String[]{esClusterIP};
for (int i = 0, len = esClusterIPs.length; i < len; i++) {
serverAddress.add(new EsServerAddress(esClusterIPs[i], esClusterPort));
}
for (EsServerAddress address : serverAddress) {
client.addTransportAddress(new InetSocketTransportAddress(
new InetSocketAddress(address.getHost(), address.getPort())));
}
}

@Override
public void post() {
super.post();
}

@Override
public void postHandler(Configuration jobConfiguration) {
super.postHandler(jobConfiguration);
}

@Override
public void destroy() {
client.close();
}

@Override
public void startRead(RecordSender recordSender) {
SearchResponse response = client.prepareSearch(esIndex).setTypes(esType)
.setQuery(QueryBuilders.matchAllQuery()).setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setScroll(new TimeValue(60000)).setSize(batchSize).setExplain(true).execute().actionGet();
int totalSize = 0;
Record record = null;
while (true) {
SearchHit[] hitArray = response.getHits().getHits();
SearchHit hit = null;
for (int i = 0, len = hitArray.length; i < len; i++) {
record = recordSender.createRecord();
hit = hitArray[i];
record.addColumn(new StringColumn(gson.toJson(hit.getSource())));
recordSender.sendToWriter(record);
}
if (hitArray.length == 0) break;
totalSize += hitArray.length;
response = client.prepareSearchScroll(response.getScrollId())
.setScroll(new TimeValue(60000)).execute().actionGet();
}
LOG.info("total size : " + totalSize);
}

}

}

package com.alibaba.datax.plugin.reader.esreader;

public final class Key {

/*
* @name: esClusterName
* @description: elastic search cluster name
*/
public final static String esClusterName = "esClusterName";

/*
* @name: esClusterIP
* @description: elastic search cluster ip
*/
public final static String esClusterIP = "esClusterIP";

/*
* @name: esClusterPort
* @description: elastic search cluster port
*/
public final static String esClusterPort = "esClusterPort";

/*
* @name: esIndex
* @description: elastic search index
*/
public final static String esIndex = "esIndex";

/*
* @name: esType
* @description: elastic search type
*/
public final static String esType = "esType";

/*
* @name: batchSize
* @description: elasticsearch batch size
*/
public final static String batchSize = "batchSize";

}

plugin_job_template.json

{
"name": "esreader",
"parameter": {
"esClusterName": "",
"esClusterIP": "",
"esClusterPort": "",
"esIndex": "",
"esType": "",
"batchSize": ""
}
}


plugin.json

{
"name": "esreader",
"class": "com.alibaba.datax.plugin.reader.esreader.ESReader",
"description": {
"useScene": "only for developer test.",
"mechanism": "use datax framework to transport elastic search data to channel.",
"warn": "Never use it in your real job."
},
"developer": "wulin"
}


3、根据python bin/datax.py -r esreader -w hdfswriter生成一个job/es_to_hdfs.json文件,填写相关内容。

{
"job": {
"content": [
{
"reader": {
"name": "esreader",
"parameter": {
"batchSize": "1000",
"esClusterIP": "192.168.0.114",
"esClusterName": "elasticsearch",
"esClusterPort": "9300",
"esIndex": "data",
"esType": "t1"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [{"name":"data","type":"string"}],
"defaultFS": "hdfs://192.168.0.114:9000",
"compress": "gzip",
"fieldDelimiter": ",",
"fileName": "esdata",
"fileType": "text",
"path": "/user/data/es",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}

4、执行python
bin/datax.py job/es_to_hdfs.json
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  datax elasticsearch reader