spark之DataFrame分析日志文件
2016-06-05 17:23
323 查看
场景:我们利用DataFrame对日志中出现的错误次数进行一个统计。
一,准备日志文件:
我这里是使用的hadoop的日志文件,因为以前配置文件没有配好,所有每次启动hadoop会有错误的信息,记录在日志文件。
二,书写流程:
三,代码展示:
一,准备日志文件:
我这里是使用的hadoop的日志文件,因为以前配置文件没有配好,所有每次启动hadoop会有错误的信息,记录在日志文件。
二,书写流程:
1,读取日志文件,将文件转化成RDD。 2,将日志文件通过map函数将数据转化行的格式返回。 3,创建元类型, 即创建schema,为RDD转化为DataFrame提供格式。 4,根据元数据类型将JavaRDD<Row>转化成DataFrame 5,使用过滤器筛选错误信息。 6,输出错误信息统计次数。
三,代码展示:
import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public class CountErrors2 { public static void main(String[] args) { SparkConf conf=new SparkConf(); conf.set("spark.testing.memory", "2147480000"); JavaSparkContext sc=new JavaSparkContext("local","CountErrors2",conf); System.out.println("建立连接成功:"+conf); //读取日志文件 JavaRDD<String> textFile=sc.textFile("hdfs://192.168.61.128:9000/spark001/hadoop.log"); //将日志文件通过map函数将数据转化行的格式返回 JavaRDD<Row> rowRDD=textFile.map(new Function<String,Row>(){ public Row call(String line) throws Exception { return RowFactory.create(line); }}); //StructField 结构化文件格式 List<StructField> fileds=new ArrayList<StructField>(); //创建一个结构化文件,三个参数 分别是 name 值的类型 是否设置为表 fileds.add(DataTypes.createStructField("line", DataTypes.StringType, true)); // 创建元类型, 即创建schema StructType schema=DataTypes.createStructType(fileds); SQLContext sqlContext=new org.apache.spark.sql.SQLContext(sc); //根据元数据类型将JavaRDD<Row>转化成DataFrame DataFrame df=sqlContext.createDataFrame(rowRDD, schema); //过滤文件 DataFrame errors=df.filter(df.col("line").like("%ERROR%")); //统计出现次数 long result=errors.count(); System.out.println("log文件总共记录有:"+result+":次出错!"); } }
相关文章推荐
- 详解HDFS Short Circuit Local Reads
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- 我是运营,我没有假期
- Hadoop_2.1.0 MapReduce序列图
- 使用Hadoop搭建现代电信企业架构
- Spark随谈——开发指南(译)
- 单机版搭建Hadoop环境图文教程详解
- Spark,一种快速数据分析替代方案
- Mootools 1.2教程 函数
- autoit InputBox 函数
- 文件遍历排序函数
- DB2数据库的安装
- C#实现把指定数据写入串口
- “传奇”图象数据存储方式
- PostgreSQL教程(二):模式Schema详解
- 关于C#中排序函数的总结
- Oracle 函数大全[字符串函数,数学函数,日期函数]第1/4页
- 修复mysql数据库
- ASP下经常用的字符串等函数参考资料