您的位置:首页 > 运维架构

spark之DataFrame分析日志文件

2016-06-05 17:23 323 查看
场景:我们利用DataFrame对日志中出现的错误次数进行一个统计。

一,准备日志文件:

我这里是使用的hadoop的日志文件,因为以前配置文件没有配好,所有每次启动hadoop会有错误的信息,记录在日志文件。

二,书写流程:

1,读取日志文件,将文件转化成RDD。
2,将日志文件通过map函数将数据转化行的格式返回。
3,创建元类型, 即创建schema,为RDD转化为DataFrame提供格式。
4,根据元数据类型将JavaRDD<Row>转化成DataFrame
5,使用过滤器筛选错误信息。
6,输出错误信息统计次数。


三,代码展示:

import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class CountErrors2 {
public static void main(String[] args) {
SparkConf conf=new SparkConf();
conf.set("spark.testing.memory", "2147480000");
JavaSparkContext sc=new JavaSparkContext("local","CountErrors2",conf);
System.out.println("建立连接成功:"+conf);
//读取日志文件
JavaRDD<String> textFile=sc.textFile("hdfs://192.168.61.128:9000/spark001/hadoop.log");
//将日志文件通过map函数将数据转化行的格式返回
JavaRDD<Row> rowRDD=textFile.map(new Function<String,Row>(){
public Row call(String line) throws Exception {
return RowFactory.create(line);
}});

//StructField 结构化文件格式
List<StructField> fileds=new ArrayList<StructField>();
//创建一个结构化文件,三个参数 分别是  name 值的类型   是否设置为表
fileds.add(DataTypes.createStructField("line", DataTypes.StringType, true));
// 创建元类型, 即创建schema
StructType schema=DataTypes.createStructType(fileds);
SQLContext sqlContext=new org.apache.spark.sql.SQLContext(sc);
//根据元数据类型将JavaRDD<Row>转化成DataFrame
DataFrame df=sqlContext.createDataFrame(rowRDD, schema);
//过滤文件
DataFrame errors=df.filter(df.col("line").like("%ERROR%"));
//统计出现次数
long result=errors.count();
System.out.println("log文件总共记录有:"+result+":次出错!");
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息