您的位置:首页 > 运维架构

ES-Hadoop学习笔记-初识

2016-06-27 00:30 639 查看
ES-Hadoop是连接快速查询和大数据分析的桥梁,它能够无间隙的在Hadoop和ElasticSearch上移动数据。ES Hadoop索引Hadoop数据到Elasticsearch,充分利用其查询速度,大量聚合能力来使它比以往更快,同时可以使用HDFS作为Elasticsearch长期存档。ES-Hadoop可以本地集成Hadoop生态系统上的很多流行组件,比如Spark、Hive、Pig、Storm、MapReduce等。官方有张图可以很好说明



下面直接看一个简单的ES与Hadoop之间数据移动的实例

项目依赖的jar包如下

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>2.3.2</version>
</dependency>


ElasticSearch到Hadoop最简单的实例

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.LinkedMapWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class E2HJob01 {

private static Logger LOG = LoggerFactory.getLogger(E2HJob01.class);

public static void main(String args[]) {
try {
Configuration conf = new Configuration();
conf.setBoolean("mapreduce.map.speculative", false);
conf.setBoolean("mapreduce.reduce.speculative", false);
//ElasticSearch节点
conf.set("es.nodes", "centos.host1:9200");
//ElaticSearch Index/Type
conf.set("es.resource", "job/51/");
String[] oArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (oArgs.length != 1) {
LOG.error("error");
System.exit(2);
}
Job job = Job.getInstance(conf, "51JOBE2H01");
job.setJarByClass(E2HJob01.class);
job.setInputFormatClass(EsInputFormat.class);
job.setMapperClass(E2HMapper01.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LinkedMapWritable.class);

FileOutputFormat.setOutputPath(job, new Path(oArgs[0]));

System.out.println(job.waitForCompletion(true));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}

}

class E2HMapper01 extends Mapper<Text, LinkedMapWritable, Text, LinkedMapWritable> {

private static final Logger LOG = LoggerFactory.getLogger(E2HMapper01.class);

@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
}

@Override
protected void map(Text key, LinkedMapWritable value, Context context)
throws IOException, InterruptedException {
LOG.info("key {} value {}", key, value);
context.write(key, value);
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
}

}


hadoop jar eshadoop.jar E2HJob01 /user/data/es/job/
从hadoop上的数据文件可以看到第一列是ES的doc id,第二列是doc data
也可以添加ES查询条件,实例如下
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.LinkedMapWritable;
import org.platform.eshadoop.modules.examples.writable.JobWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class E2HJob02 {

private static Logger LOG = LoggerFactory.getLogger(E2HJob02.class);

public static void main(String args[]) {
try {
Configuration conf = new Configuration();
conf.setBoolean("mapreduce.map.speculative", false);
conf.setBoolean("mapreduce.reduce.speculative", false);
conf.set("es.nodes", "centos.host1:9200");
conf.set("es.resource", "job/51/");
conf.set("es.query", "?q=高*");
String[] oArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (oArgs.length != 1) {
LOG.error("error");
System.exit(2);
}
Job job = Job.getInstance(conf, "51JOBE2H02");
job.setJarByClass(E2HJob02.class);
job.setInputFormatClass(EsInputFormat.class);
job.setMapperClass(E2HMapper02.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(JobWritable.class);

FileOutputFormat.setOutputPath(job, new Path(oArgs[0]));

System.out.println(job.waitForCompletion(true));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}

}

class E2HMapper02 extends Mapper<Text, LinkedMapWritable, NullWritable, JobWritable> {

private static final Logger LOG = LoggerFactory.getLogger(E2HMapper02.class);

@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
}

@Override
protected void map(Text key, LinkedMapWritable value, Context context)
throws IOException, InterruptedException {
JobWritable writable = new JobWritable();
writable.setId(key);
Map<String, String> map = new HashMap<String, String>();
for (Entry<Writable, Writable> entry : value.entrySet()) {
LOG.info("key {} value {}", entry.getKey(), entry.getValue());
map.put(entry.getKey().toString(), entry.getValue().toString());
}
String jobName = map.get("jobName");
if (StringUtils.isNotBlank(jobName)) {
writable.setJobName(new Text(jobName));
}
String jobUrl = map.get("jobUrl");
if (StringUtils.isNotBlank(jobUrl)) {
writable.setJobUrl(new Text(jobUrl));
}
String companyName = map.get("companyName");
if (StringUtils.isNotBlank(companyName)) {
writable.setCompanyName(new Text(companyName));
}
String companyUrl = map.get("companyUrl");
if (StringUtils.isNotBlank(companyUrl)) {
writable.setCompanyUrl(new Text(companyUrl));
}
String salary = map.get("salary");
if (StringUtils.isNotBlank(salary)) {
writable.setSalary(new Text(salary));
}
String workPlace = map.get("workPlace");
if (StringUtils.isNotBlank(workPlace)) {
writable.setWorkPlace(new Text(workPlace));
}
String contact = map.get("contact");
if (StringUtils.isNotBlank(contact)) {
writable.setContact(new Text(contact));
}
String welfare = map.get("welfare");
if (StringUtils.isNotBlank(welfare)) {
writable.setWelfare(new Text(welfare));
}
context.write(NullWritable.get(), writable);
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
}

}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

public class JobWritable implements Writable, Cloneable {

private Text id = null;

private Text jobName = null;

private Text jobUrl = null;

private Text companyName = null;

private Text companyUrl = null;

private Text salary = null;

private Text workPlace = null;

private Text contact = null;

private Text welfare = null;

public JobWritable() {
id = new Text();
jobName = new Text();
jobUrl = new Text();
companyName = new Text();
companyUrl = new Text();
salary = new Text();
workPlace = new Text();
contact = new Text();
welfare = new Text();
}

public void readFields(DataInput dataInput) throws IOException {
id.readFields(dataInput);
jobName.readFields(dataInput);
jobUrl.readFields(dataInput);
companyName.readFields(dataInput);
companyUrl.readFields(dataInput);
salary.readFields(dataInput);
workPlace.readFields(dataInput);
contact.readFields(dataInput);
welfare.readFields(dataInput);
}

public void write(DataOutput dataOutput) throws IOException {
id.write(dataOutput);
jobName.write(dataOutput);
jobUrl.write(dataOutput);
companyName.write(dataOutput);
companyUrl.write(dataOutput);
salary.write(dataOutput);
workPlace.write(dataOutput);
contact.write(dataOutput);
welfare.write(dataOutput);
}

public Text getId() {
return id;
}

public void setId(Text id) {
this.id = id;
}

public Text getJobName() {
return jobName;
}

public void setJobName(Text jobName) {
this.jobName = jobName;
}

public Text getJobUrl() {
return jobUrl;
}

public void setJobUrl(Text jobUrl) {
this.jobUrl = jobUrl;
}

public Text getCompanyName() {
return companyName;
}

public void setCompanyName(Text companyName) {
this.companyName = companyName;
}

public Text getCompanyUrl() {
return companyUrl;
}

public void setCompanyUrl(Text companyUrl) {
this.companyUrl = companyUrl;
}

public Text getSalary() {
return salary;
}

public void setSalary(Text salary) {
this.salary = salary;
}

public Text getWorkPlace() {
return workPlace;
}

public void setWorkPlace(Text workPlace) {
this.workPlace = workPlace;
}

public Text getContact() {
return contact;
}

public void setContact(Text contact) {
this.contact = contact;
}

public Text getWelfare() {
return welfare;
}

public void setWelfare(Text welfare) {
this.welfare = welfare;
}

@Override
public String toString() {
return id + ":" + jobName + ":" + jobUrl + ":" + companyName + ":" + companyUrl +
":" + salary + ":" + workPlace + ":" + contact + ":" + welfare;
}

}

下面这个实例是每行直接以json格式存储在hadoop上

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import org.elasticsearch.hadoop.mr.LinkedMapWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.gson.Gson;

public class E2HJob03 {

private static Logger LOG = LoggerFactory.getLogger(E2HJob03.class);

public static void main(String args[]) {
try {
Configuration conf = new Configuration();
conf.setBoolean("mapreduce.map.speculative", false);
conf.setBoolean("mapreduce.reduce.speculative", false);
conf.set("es.nodes", "centos.host1:9200");
conf.set("es.resource", "job/51/");
String[] oArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (oArgs.length != 1) {
LOG.error("error");
System.exit(2);
}
Job job = Job.getInstance(conf, "51JOBE2H03");
job.setJarByClass(E2HJob03.class);
job.setInputFormatClass(EsInputFormat.class);
job.setMapperClass(E2HMapper03.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);

FileOutputFormat.setOutputPath(job, new Path(oArgs[0]));

System.out.println(job.waitForCompletion(true));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}

}

class E2HMapper03 extends Mapper<Text, LinkedMapWritable, NullWritable, Text> {

private static final Logger LOG = LoggerFactory.getLogger(E2HMapper02.class);

private Gson gson = null;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
gson = new Gson();
}

@Override
protected void map(Text key, LinkedMapWritable value, Context context)
throws IOException, InterruptedException {
JobInfo jobInfo = new JobInfo();
jobInfo.setId(key.toString());
Map<String, String> map = new HashMap<String, String>();
for (Entry<Writable, Writable> entry : value.entrySet()) {
LOG.info("key {} value {}", entry.getKey(), entry.getValue());
map.put(entry.getKey().toString(), entry.getValue().toString());
}
jobInfo.setJobName(map.get("jobName"));
jobInfo.setJobUrl(map.get("jobUrl"));
jobInfo.setCompanyName(map.get("companyName"));
jobInfo.setCompanyUrl(map.get("companyUrl"));
jobInfo.setSalary(map.get("salary"));
jobInfo.setWorkPlace(map.get("workPlace"));
jobInfo.setContact(map.get("contact"));
jobInfo.setWelfare(map.get("welfare"));
context.write(NullWritable.get(), new Text(gson.toJson(jobInfo)));
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
}

}

class JobInfo {

private String id = null;

private String jobName = null;

private String jobUrl = null;

private String companyName = null;

private String companyUrl = null;

private String salary = null;

private String workPlace = null;

private String contact = null;

private String welfare = null;

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getJobName() {
return jobName;
}

public void setJobName(String jobName) {
this.jobName = jobName;
}

public String getJobUrl() {
return jobUrl;
}

public void setJobUrl(String jobUrl) {
this.jobUrl = jobUrl;
}

public String getCompanyName() {
return companyName;
}

public void setCompanyName(String companyName) {
this.companyName = companyName;
}

public String getCompanyUrl() {
return companyUrl;
}

public void setCompanyUrl(String companyUrl) {
this.companyUrl = companyUrl;
}

public String getSalary() {
return salary;
}

public void setSalary(String salary) {
this.salary = salary;
}

public String getWorkPlace() {
return workPlace;
}

public void setWorkPlace(String workPlace) {
this.workPlace = workPlace;
}

public String getContact() {
return contact;
}

public void setContact(String contact) {
this.contact = contact;
}

public String getWelfare() {
return welfare;
}

public void setWelfare(String welfare) {
this.welfare = welfare;
}

}

接下来的实例是将hadoop上的数据移动到ElasticSearch上索引,这里直接用上面存储的JSON数据试验

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class H2EJob {

private static Logger LOG = LoggerFactory.getLogger(H2EJob.class);

public static void main(String args[]) {
try {
Configuration conf = new Configuration();
conf.setBoolean("mapreduce.map.speculative", false);
conf.setBoolean("mapreduce.reduce.speculative", false);
conf.set("es.nodes", "centos.host1:9200");
conf.set("es.resource", "job1/51");
//Hadoop上的数据格式为JSON,可以直接导入
conf.set("es.input.json", "yes");
String[] oArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (oArgs.length != 1) {
LOG.error("error");
System.exit(2);
}
Job job = Job.getInstance(conf, "51JOBH2E");
job.setJarByClass(H2EJob.class);
job.setMapperClass(H2EMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(EsOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(oArgs[0]));

System.out.println(job.waitForCompletion(true));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}

}

class H2EMapper extends Mapper<LongWritable, Text, NullWritable, Text> {

@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
}

@Override
public void run(Context context) throws IOException, InterruptedException {
super.run(context);
}

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(NullWritable.get(), value);
}

@Override
protected void cleanup(Context context) throws IOException,InterruptedException {
super.cleanup(context);
}

}

执行hadoop jar eshadoop.jar H2EJob
/user/data/es/job后,可以在ES上看到数据已经索引过来。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息