mapreduce的结果写入到mysql中
2016-12-30 16:52
253 查看
输入文件为:
把统计的IP 和每个ip的总数存入到mysql中
1、自定义类,该类实现了从日志记录中判断数据是否合格
public class TextLine {
private String ip;
//标识数据是否合法
private boolean flag = true;
public TextLine(String line){
//检验一行日志数据是否符合要求,如不符合,将其标识为不可用
if(line == null || "".equals(line)){
this.flag = false;
return;
}
String[] strs = line.split(" ");
if(strs.length < 2){
this.flag = false;
return;
}
ip = strs[0];
}
public boolean isRight(){
return flag;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
}
2、自定义输出格式类
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
//向mysql中写入数据
public class TblsWritable implements Writable, DBWritable{
String ip;
int count;
public TblsWritable() {
}
public TblsWritable(String ip, int count) {
this.ip = ip;
this.count = count;
}
public void write(PreparedStatement statement) throws SQLException {
statement.setString(1, this.ip);
statement.setInt(2, this.count);
}
public void readFields(ResultSet resultSet) throws SQLException {
this.ip = resultSet.getString(1);
this.count = resultSet.getInt(2);
}
public void write(DataOutput out) throws IOException {
out.writeUTF(ip);
out.writeInt(count);
}
public void readFields(DataInput in) throws IOException {
ip = in.readUTF();
count = in.readInt();
}
}
3、写mapreduce实现类
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
//分析网络服务器上的Apache日志,统计每个IP访问资源的次数,并将结果写入到mysql数据库中。
public class IPCountMR {
public static class IPCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
private Text k = new Text();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
//判断该行数据是否合法
TextLine textLine = new TextLine(line);
if (textLine.isRight()) {
k.set(textLine.getIp());
context.write(k, one);
}
}
}
public static class IPCountReducer extends Reducer<Text, IntWritable, TblsWritable, TblsWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(new TblsWritable(key.toString(), sum), null);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.10.1:3306/hadoop", "root", "root");
// 新建一个任务
Job job = Job.getInstance(conf, "ip-count");
// 设置主类
job.setJarByClass(IPCountMR.class);
// 输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// Mapper
job.setMapperClass(IPCountMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// Reducer
job.setReducerClass(IPCountReducer.class);
job.setOutputKeyClass(TblsWritable.class);
job.setOutputValueClass(TblsWritable.class);
// 输出格式
job.setOutputFormatClass(DBOutputFormat.class);
// 输出到哪些表、字段
DBOutputFormat.setOutput(job, "log", "ip", "count");
// 添加mysql数据库jar
job.addArchiveToClassPath(new Path("/lib/mysql/mysql-connector-java-5.1.18-bin.jar"));
//提交任务
job.waitForCompletion(true);
}
}
其中把mysql的驱动包mysql-connector-java-5.1.18-bin.jar上传到HDFS中
打包上传到服务器运行
[sparkadmin@hadoop4 test]$ hadoop jar ipcount.jar /tom
在mysql中查看
把统计的IP 和每个ip的总数存入到mysql中
1、自定义类,该类实现了从日志记录中判断数据是否合格
public class TextLine {
private String ip;
//标识数据是否合法
private boolean flag = true;
public TextLine(String line){
//检验一行日志数据是否符合要求,如不符合,将其标识为不可用
if(line == null || "".equals(line)){
this.flag = false;
return;
}
String[] strs = line.split(" ");
if(strs.length < 2){
this.flag = false;
return;
}
ip = strs[0];
}
public boolean isRight(){
return flag;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
}
2、自定义输出格式类
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
//向mysql中写入数据
public class TblsWritable implements Writable, DBWritable{
String ip;
int count;
public TblsWritable() {
}
public TblsWritable(String ip, int count) {
this.ip = ip;
this.count = count;
}
public void write(PreparedStatement statement) throws SQLException {
statement.setString(1, this.ip);
statement.setInt(2, this.count);
}
public void readFields(ResultSet resultSet) throws SQLException {
this.ip = resultSet.getString(1);
this.count = resultSet.getInt(2);
}
public void write(DataOutput out) throws IOException {
out.writeUTF(ip);
out.writeInt(count);
}
public void readFields(DataInput in) throws IOException {
ip = in.readUTF();
count = in.readInt();
}
}
3、写mapreduce实现类
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
//分析网络服务器上的Apache日志,统计每个IP访问资源的次数,并将结果写入到mysql数据库中。
public class IPCountMR {
public static class IPCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
private Text k = new Text();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
//判断该行数据是否合法
TextLine textLine = new TextLine(line);
if (textLine.isRight()) {
k.set(textLine.getIp());
context.write(k, one);
}
}
}
public static class IPCountReducer extends Reducer<Text, IntWritable, TblsWritable, TblsWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(new TblsWritable(key.toString(), sum), null);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.10.1:3306/hadoop", "root", "root");
// 新建一个任务
Job job = Job.getInstance(conf, "ip-count");
// 设置主类
job.setJarByClass(IPCountMR.class);
// 输入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// Mapper
job.setMapperClass(IPCountMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// Reducer
job.setReducerClass(IPCountReducer.class);
job.setOutputKeyClass(TblsWritable.class);
job.setOutputValueClass(TblsWritable.class);
// 输出格式
job.setOutputFormatClass(DBOutputFormat.class);
// 输出到哪些表、字段
DBOutputFormat.setOutput(job, "log", "ip", "count");
// 添加mysql数据库jar
job.addArchiveToClassPath(new Path("/lib/mysql/mysql-connector-java-5.1.18-bin.jar"));
//提交任务
job.waitForCompletion(true);
}
}
其中把mysql的驱动包mysql-connector-java-5.1.18-bin.jar上传到HDFS中
打包上传到服务器运行
[sparkadmin@hadoop4 test]$ hadoop jar ipcount.jar /tom
在mysql中查看
相关文章推荐
- 新版api mapreduce reduce结果写入mysql
- Mapreduce 结果写入Hbase两种方法对比
- 将mysql执行结果按照固定格式写入文件
- 26.session聚合统计之计算统计结果并写入MySQL
- mysql在终端执行sql并把结果写入文件的方法
- spark-python从hdfs文本数据(美国历年出生人数)统计然后把结果数据写入mysql
- mysql将查询结果写入变量,存储过程
- 从Mysql查询的结果,写入txt文件中,每个字段用|分隔,下一条数据要换行
- Spark将计算结果写入到Mysql中
- MySQL查询结果写入到文件总结
- Spark中ip映射数据应用库,二分查找省份,将结果写入mysql
- php查询mysql中的数据,并将结果返回为JSON格式,提取JSON中的数据,写入页面不同的id号中
- mapreduce输出结果到mysql
- PYTHON--定期监测服务器端口,并将结果写入MYSQL
- Python 2.x 将mysql查询结果写入excel并打包成zip
- Spark将计算结果写入到Mysql中
- 将Mysql查询结果写入到文件
- mapreduce输出结果到mysql
- spark将计算结果写入到mysql中
- saltstack--在master上直接将返回结果写入mysql