您的位置:首页 > 其它

Spark2.1.0 读取外部txt并以DataFrame输出

2018-01-05 10:23 369 查看
需求:使用Spark2.1.0开始Spark的开发,但是网上2.0之后的教程不多,所以自己写一个。

ps:官网的教程也有写

http://spark.apache.org/docs/2.1.0/sql-programming-guide.html#data-sources

外部文件txt,文件内容:全是json串


代码:

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.json.JSONObject;

public class SparkSession2 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("Spark2").master("local[2]").getOrCreate();
JavaRDD<String> input = spark.sparkContext().textFile("/data/json/part-r-00000.txt",
1).toJavaRDD().map(x -> x.split("____")[1]);
JavaRDD<Book> books = input.map(new Function<String, Book>() {
public Book call(String s) throws Exception {
JSONObject jsons = new JSONObject(s);
Book book = new Book();

book.setBookid(jsons.get("bookId").toString());
book.setConyent(jsons.get("content").toString());
book.setContentStartPos(jsons.get("contentStartPos").toString());
book.setCoord(jsons.get("coord").toString());
book.setId(jsons.get("id").toString());
book.setLineColor(jsons
b41b
.get("lineColor").toString());
book.setLineType(jsons.get("lineType").toString());
book.setLineWidth(jsons.get("lineWidth").toString());
book.setNoteCatalog(jsons.get("noteCatalog").toString());
book.setNoteLabels(jsons.get("noteLabels").toString());
book.setNoteOrigin(jsons.get("noteOrigin").toString());
book.setNotePath(jsons.get("notePath").toString());
book.setNotePostil(jsons.get("notePostil").toString());
book.setNoteType(jsons.get("noteType").toString());
book.setPageAngle(jsons.get("pageAngle").toString());
book.setPageHeight(jsons.get("pageHeight").toString());
book.setPageIndex(jsons.get("pageIndex").toString());
book.setPageWidth(jsons.get("pageWidth").toString());
book.setPdfId(jsons.get("pdfId").toString());
book.setUpdateTime(jsons.get("updateTime").toString());
book.setUserName(jsons.get("userName").toString());
book.setSourceType(jsons.get("sourceType").toString());

return book;
}
});

Dataset<Row> bookdf = spark.createDataFrame(books,Book.class);
bookdf.createOrReplaceTempView("book");
bookdf.show();
Dataset<Row> bIddf = spark.sql("select bookid from book");
bIddf.show();
spark.close();
}
}


补:本项目使用idea开发,Spark的standalone模式会产生大量的日志,所以为免输出结果淹没在日志中,设置输出日志为报错日志,其他warning日志不输出。

(maven)在resources下新建log4j.properties


log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.threshold=INFO
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%5p] - %c -%F(%L) -%m%n

log4j.appender.FILE=org.apache.log4j.RollingFileAppender
log4j.appender.FILE.Append=true
log4j.appender.FILE.File=logs/log4jtest.log
log4j.appender.FILE.Threshold=INFO
log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.FILE.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%5p] - %c -%F(%L) -%m%n
log4j.appender.FILE.MaxFileSize=10MB
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: