DataX学习笔记-Reader插件开发
2016-06-24 15:28
2001 查看
DataX开发基于读取ElasticSearch数据的Reader插件
1、检出DataX源码(git
clone https://github.com/alibaba/DataX.git DataX),导入项目,新建一个esreader的maven项目进行插件开发。
2、在DataX安装目录的plugins/reader目录下新建esreader目录,目录下包含plugin_job_template.json、plugin.json、esreader-0.0.1-SNAPSHOT.jar,同时在目录下创建一个libs目录,存放相关依赖的jar文件。
相关代码:
package com.alibaba.datax.plugin.reader.esreader;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.google.gson.Gson;
import com.umeng.es.config.EsServerAddress;
public class ESReader extends Reader {
public static class Job extends Reader.Job {
private Configuration originalConfiguration = null;
@Override
public void preCheck() {
super.preCheck();
}
@Override
public void preHandler(Configuration jobConfiguration) {
super.preHandler(jobConfiguration);
}
@Override
public void init() {
this.originalConfiguration = super.getPluginJobConf();
}
@Override
public void prepare() {
super.prepare();
}
@Override
public void post() {
super.post();
}
@Override
public void postHandler(Configuration jobConfiguration) {
super.postHandler(jobConfiguration);
}
@Override
public void destroy() {
}
@Override
public List<Configuration> split(int adviceNumber) {
List<Configuration> readerSplitConfiguration = new ArrayList<Configuration>();
for (int i = 0; i < adviceNumber; i++) {
readerSplitConfiguration.add(this.originalConfiguration);
}
return readerSplitConfiguration;
}
}
public static class Task extends Reader.Task {
private Configuration readerSliceConfiguration = null;
private String esClusterName = null;
private String esClusterIP = null;
private Integer esClusterPort = null;
private String esIndex = null;
private String esType = null;
private Gson gson = null;
private TransportClient client = null;
private Integer batchSize = null;
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
@Override
public void preCheck() {
super.preCheck();
}
@Override
public void preHandler(Configuration jobConfiguration) {
super.preHandler(jobConfiguration);
}
@Override
public void init() {
this.readerSliceConfiguration = super.getPluginJobConf();
this.esClusterName = readerSliceConfiguration.getString(Key.esClusterName);
this.esClusterIP = readerSliceConfiguration.getString(Key.esClusterIP);
this.esClusterPort = readerSliceConfiguration.getInt(Key.esClusterPort, 9300);
this.esIndex = readerSliceConfiguration.getString(Key.esIndex);
this.esType = readerSliceConfiguration.getString(Key.esType);
this.batchSize = readerSliceConfiguration.getInt(Key.batchSize, 1000);
this.gson = new Gson();
}
@Override
public void prepare() {
super.prepare();
Settings settings = Settings.builder().put("cluster.name", esClusterName)
.put("client.tansport.sniff", true).build();
client = TransportClient.builder().settings(settings).build();
List<EsServerAddress> serverAddress = new ArrayList<EsServerAddress>();
String[] esClusterIPs = esClusterIP.contains(",") ?
esClusterIP.split(",") : new String[]{esClusterIP};
for (int i = 0, len = esClusterIPs.length; i < len; i++) {
serverAddress.add(new EsServerAddress(esClusterIPs[i], esClusterPort));
}
for (EsServerAddress address : serverAddress) {
client.addTransportAddress(new InetSocketTransportAddress(
new InetSocketAddress(address.getHost(), address.getPort())));
}
}
@Override
public void post() {
super.post();
}
@Override
public void postHandler(Configuration jobConfiguration) {
super.postHandler(jobConfiguration);
}
@Override
public void destroy() {
client.close();
}
@Override
public void startRead(RecordSender recordSender) {
SearchResponse response = client.prepareSearch(esIndex).setTypes(esType)
.setQuery(QueryBuilders.matchAllQuery()).setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setScroll(new TimeValue(60000)).setSize(batchSize).setExplain(true).execute().actionGet();
int totalSize = 0;
Record record = null;
while (true) {
SearchHit[] hitArray = response.getHits().getHits();
SearchHit hit = null;
for (int i = 0, len = hitArray.length; i < len; i++) {
record = recordSender.createRecord();
hit = hitArray[i];
record.addColumn(new StringColumn(gson.toJson(hit.getSource())));
recordSender.sendToWriter(record);
}
if (hitArray.length == 0) break;
totalSize += hitArray.length;
response = client.prepareSearchScroll(response.getScrollId())
.setScroll(new TimeValue(60000)).execute().actionGet();
}
LOG.info("total size : " + totalSize);
}
}
}
package com.alibaba.datax.plugin.reader.esreader;
public final class Key {
/*
* @name: esClusterName
* @description: elastic search cluster name
*/
public final static String esClusterName = "esClusterName";
/*
* @name: esClusterIP
* @description: elastic search cluster ip
*/
public final static String esClusterIP = "esClusterIP";
/*
* @name: esClusterPort
* @description: elastic search cluster port
*/
public final static String esClusterPort = "esClusterPort";
/*
* @name: esIndex
* @description: elastic search index
*/
public final static String esIndex = "esIndex";
/*
* @name: esType
* @description: elastic search type
*/
public final static String esType = "esType";
/*
* @name: batchSize
* @description: elasticsearch batch size
*/
public final static String batchSize = "batchSize";
}
plugin_job_template.json
{
"name": "esreader",
"parameter": {
"esClusterName": "",
"esClusterIP": "",
"esClusterPort": "",
"esIndex": "",
"esType": "",
"batchSize": ""
}
}
plugin.json
{
"name": "esreader",
"class": "com.alibaba.datax.plugin.reader.esreader.ESReader",
"description": {
"useScene": "only for developer test.",
"mechanism": "use datax framework to transport elastic search data to channel.",
"warn": "Never use it in your real job."
},
"developer": "wulin"
}
3、根据python bin/datax.py -r esreader -w hdfswriter生成一个job/es_to_hdfs.json文件,填写相关内容。
{
"job": {
"content": [
{
"reader": {
"name": "esreader",
"parameter": {
"batchSize": "1000",
"esClusterIP": "192.168.0.114",
"esClusterName": "elasticsearch",
"esClusterPort": "9300",
"esIndex": "data",
"esType": "t1"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [{"name":"data","type":"string"}],
"defaultFS": "hdfs://192.168.0.114:9000",
"compress": "gzip",
"fieldDelimiter": ",",
"fileName": "esdata",
"fileType": "text",
"path": "/user/data/es",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
4、执行python
bin/datax.py job/es_to_hdfs.json
1、检出DataX源码(git
clone https://github.com/alibaba/DataX.git DataX),导入项目,新建一个esreader的maven项目进行插件开发。
2、在DataX安装目录的plugins/reader目录下新建esreader目录,目录下包含plugin_job_template.json、plugin.json、esreader-0.0.1-SNAPSHOT.jar,同时在目录下创建一个libs目录,存放相关依赖的jar文件。
相关代码:
package com.alibaba.datax.plugin.reader.esreader;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.google.gson.Gson;
import com.umeng.es.config.EsServerAddress;
public class ESReader extends Reader {
public static class Job extends Reader.Job {
private Configuration originalConfiguration = null;
@Override
public void preCheck() {
super.preCheck();
}
@Override
public void preHandler(Configuration jobConfiguration) {
super.preHandler(jobConfiguration);
}
@Override
public void init() {
this.originalConfiguration = super.getPluginJobConf();
}
@Override
public void prepare() {
super.prepare();
}
@Override
public void post() {
super.post();
}
@Override
public void postHandler(Configuration jobConfiguration) {
super.postHandler(jobConfiguration);
}
@Override
public void destroy() {
}
@Override
public List<Configuration> split(int adviceNumber) {
List<Configuration> readerSplitConfiguration = new ArrayList<Configuration>();
for (int i = 0; i < adviceNumber; i++) {
readerSplitConfiguration.add(this.originalConfiguration);
}
return readerSplitConfiguration;
}
}
public static class Task extends Reader.Task {
private Configuration readerSliceConfiguration = null;
private String esClusterName = null;
private String esClusterIP = null;
private Integer esClusterPort = null;
private String esIndex = null;
private String esType = null;
private Gson gson = null;
private TransportClient client = null;
private Integer batchSize = null;
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
@Override
public void preCheck() {
super.preCheck();
}
@Override
public void preHandler(Configuration jobConfiguration) {
super.preHandler(jobConfiguration);
}
@Override
public void init() {
this.readerSliceConfiguration = super.getPluginJobConf();
this.esClusterName = readerSliceConfiguration.getString(Key.esClusterName);
this.esClusterIP = readerSliceConfiguration.getString(Key.esClusterIP);
this.esClusterPort = readerSliceConfiguration.getInt(Key.esClusterPort, 9300);
this.esIndex = readerSliceConfiguration.getString(Key.esIndex);
this.esType = readerSliceConfiguration.getString(Key.esType);
this.batchSize = readerSliceConfiguration.getInt(Key.batchSize, 1000);
this.gson = new Gson();
}
@Override
public void prepare() {
super.prepare();
Settings settings = Settings.builder().put("cluster.name", esClusterName)
.put("client.tansport.sniff", true).build();
client = TransportClient.builder().settings(settings).build();
List<EsServerAddress> serverAddress = new ArrayList<EsServerAddress>();
String[] esClusterIPs = esClusterIP.contains(",") ?
esClusterIP.split(",") : new String[]{esClusterIP};
for (int i = 0, len = esClusterIPs.length; i < len; i++) {
serverAddress.add(new EsServerAddress(esClusterIPs[i], esClusterPort));
}
for (EsServerAddress address : serverAddress) {
client.addTransportAddress(new InetSocketTransportAddress(
new InetSocketAddress(address.getHost(), address.getPort())));
}
}
@Override
public void post() {
super.post();
}
@Override
public void postHandler(Configuration jobConfiguration) {
super.postHandler(jobConfiguration);
}
@Override
public void destroy() {
client.close();
}
@Override
public void startRead(RecordSender recordSender) {
SearchResponse response = client.prepareSearch(esIndex).setTypes(esType)
.setQuery(QueryBuilders.matchAllQuery()).setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setScroll(new TimeValue(60000)).setSize(batchSize).setExplain(true).execute().actionGet();
int totalSize = 0;
Record record = null;
while (true) {
SearchHit[] hitArray = response.getHits().getHits();
SearchHit hit = null;
for (int i = 0, len = hitArray.length; i < len; i++) {
record = recordSender.createRecord();
hit = hitArray[i];
record.addColumn(new StringColumn(gson.toJson(hit.getSource())));
recordSender.sendToWriter(record);
}
if (hitArray.length == 0) break;
totalSize += hitArray.length;
response = client.prepareSearchScroll(response.getScrollId())
.setScroll(new TimeValue(60000)).execute().actionGet();
}
LOG.info("total size : " + totalSize);
}
}
}
package com.alibaba.datax.plugin.reader.esreader;
public final class Key {
/*
* @name: esClusterName
* @description: elastic search cluster name
*/
public final static String esClusterName = "esClusterName";
/*
* @name: esClusterIP
* @description: elastic search cluster ip
*/
public final static String esClusterIP = "esClusterIP";
/*
* @name: esClusterPort
* @description: elastic search cluster port
*/
public final static String esClusterPort = "esClusterPort";
/*
* @name: esIndex
* @description: elastic search index
*/
public final static String esIndex = "esIndex";
/*
* @name: esType
* @description: elastic search type
*/
public final static String esType = "esType";
/*
* @name: batchSize
* @description: elasticsearch batch size
*/
public final static String batchSize = "batchSize";
}
plugin_job_template.json
{
"name": "esreader",
"parameter": {
"esClusterName": "",
"esClusterIP": "",
"esClusterPort": "",
"esIndex": "",
"esType": "",
"batchSize": ""
}
}
plugin.json
{
"name": "esreader",
"class": "com.alibaba.datax.plugin.reader.esreader.ESReader",
"description": {
"useScene": "only for developer test.",
"mechanism": "use datax framework to transport elastic search data to channel.",
"warn": "Never use it in your real job."
},
"developer": "wulin"
}
3、根据python bin/datax.py -r esreader -w hdfswriter生成一个job/es_to_hdfs.json文件,填写相关内容。
{
"job": {
"content": [
{
"reader": {
"name": "esreader",
"parameter": {
"batchSize": "1000",
"esClusterIP": "192.168.0.114",
"esClusterName": "elasticsearch",
"esClusterPort": "9300",
"esIndex": "data",
"esType": "t1"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [{"name":"data","type":"string"}],
"defaultFS": "hdfs://192.168.0.114:9000",
"compress": "gzip",
"fieldDelimiter": ",",
"fileName": "esdata",
"fileType": "text",
"path": "/user/data/es",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
4、执行python
bin/datax.py job/es_to_hdfs.json
相关文章推荐
- 让Adobe Reader 7.0 8.1 记住您上次阅读的位置
- 巧用mysql提示符prompt清晰管理数据库的方法
- 两大步骤教您开启MySQL 数据库远程登陆帐号的方法
- 深入多线程之:Reader与Write Locks(读写锁)的使用详解
- phpmyadmin 4+ 访问慢的解决方法
- linux系统下实现mysql热备份详细步骤(mysql主从复制)
- CentOS 5.5下安装MySQL 5.5全过程分享
- MySQL复制的概述、安装、故障、技巧、工具(火丁分享)
- MySQL中删除重复数据的简单方法
- MySQL5.5.21安装配置教程(win7)
- InputStreamReader 和FileReader的区别及InputStream和Reader的区别
- 使用ElasticSearch6.0快速实现全文搜索功能的示例代码
- elasticsearch批量数据导入和导出
- 使用ElasticSearch+LogStash+Kibana+Redis搭建日志管理服务
- ElasticSearch 使用心得
- ES中如何使用逗号来分词
- ElasticSearch 守护进程 JSW
- elasticsearch2.3安装以及集群部署
- elasticsearch增删改查
- Elasticsearch2.2.0数据操作