Spark2.1.0 读取外部txt并以DataFrame输出
2018-01-05 10:23
369 查看
需求:使用Spark2.1.0开始Spark的开发,但是网上2.0之后的教程不多,所以自己写一个。
ps:官网的教程也有写
http://spark.apache.org/docs/2.1.0/sql-programming-guide.html#data-sources
代码:
补:本项目使用idea开发,Spark的standalone模式会产生大量的日志,所以为免输出结果淹没在日志中,设置输出日志为报错日志,其他warning日志不输出。
ps:官网的教程也有写
http://spark.apache.org/docs/2.1.0/sql-programming-guide.html#data-sources
外部文件txt,文件内容:全是json串
代码:
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.json.JSONObject; public class SparkSession2 { public static void main(String[] args) { SparkSession spark = SparkSession.builder().appName("Spark2").master("local[2]").getOrCreate(); JavaRDD<String> input = spark.sparkContext().textFile("/data/json/part-r-00000.txt", 1).toJavaRDD().map(x -> x.split("____")[1]); JavaRDD<Book> books = input.map(new Function<String, Book>() { public Book call(String s) throws Exception { JSONObject jsons = new JSONObject(s); Book book = new Book(); book.setBookid(jsons.get("bookId").toString()); book.setConyent(jsons.get("content").toString()); book.setContentStartPos(jsons.get("contentStartPos").toString()); book.setCoord(jsons.get("coord").toString()); book.setId(jsons.get("id").toString()); book.setLineColor(jsons b41b .get("lineColor").toString()); book.setLineType(jsons.get("lineType").toString()); book.setLineWidth(jsons.get("lineWidth").toString()); book.setNoteCatalog(jsons.get("noteCatalog").toString()); book.setNoteLabels(jsons.get("noteLabels").toString()); book.setNoteOrigin(jsons.get("noteOrigin").toString()); book.setNotePath(jsons.get("notePath").toString()); book.setNotePostil(jsons.get("notePostil").toString()); book.setNoteType(jsons.get("noteType").toString()); book.setPageAngle(jsons.get("pageAngle").toString()); book.setPageHeight(jsons.get("pageHeight").toString()); book.setPageIndex(jsons.get("pageIndex").toString()); book.setPageWidth(jsons.get("pageWidth").toString()); book.setPdfId(jsons.get("pdfId").toString()); book.setUpdateTime(jsons.get("updateTime").toString()); book.setUserName(jsons.get("userName").toString()); book.setSourceType(jsons.get("sourceType").toString()); return book; } }); Dataset<Row> bookdf = spark.createDataFrame(books,Book.class); bookdf.createOrReplaceTempView("book"); bookdf.show(); Dataset<Row> bIddf = spark.sql("select bookid from book"); bIddf.show(); spark.close(); } }
补:本项目使用idea开发,Spark的standalone模式会产生大量的日志,所以为免输出结果淹没在日志中,设置输出日志为报错日志,其他warning日志不输出。
(maven)在resources下新建log4j.properties
log4j.rootCategory=ERROR, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.threshold=INFO log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%5p] - %c -%F(%L) -%m%n log4j.appender.FILE=org.apache.log4j.RollingFileAppender log4j.appender.FILE.Append=true log4j.appender.FILE.File=logs/log4jtest.log log4j.appender.FILE.Threshold=INFO log4j.appender.FILE.layout=org.apache.log4j.PatternLayout log4j.appender.FILE.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%5p] - %c -%F(%L) -%m%n log4j.appender.FILE.MaxFileSize=10MB
相关文章推荐
- spark查询任意字段,并使用dataframe输出结果
- Spark SQL读取数据源创建DataFrame(二)
- [2.1]Spark DataFrame操作(一)之读取并过滤json文件
- spark 读取hbase数据并转化为dataFrame
- Spark 2.0 DataFrame map操作中Unable to find encoder for type stored in a Dataset.问题的分析与解决
- 一起学spark(12)-- 关于RDD和DataFrame 的缓存
- Spark-RDD/DataFrame/DateSet
- 文件输入输出错误,无法读取txt文件
- java txt 文件的读取 和输出内容
- SparkGraph 与SparkDataFrame 两种方式计算朋友的二度关系
- Spark SQL and DataFrame Guide
- Spark DataFrame简述和遇到的问题
- 安装MySql+连接数据库+读取数据并存储成dataframe(python3.6)
- Spark SQL概念学习系列之DataFrame与RDD的区别
- spark中的Dataset和DataFrame
- scala-spark练手--dataframe数据可视化初稿
- SPARK 第4期:通过案例实战掌握spark sql(dataframe)
- D31 Spark的checkpoint/sql/dataframe
- Python实例1-读取指定文件夹下指定类型的图片名,打乱后输出txt
- Spark-SQL之DataFrame操作大全