Spark把RDD数据保存到一个单个文件中
2016-09-03 11:26
661 查看
Spark是当前最流行的分布式数据处理框架之一,相比于Hadoop,Spark在数据的处理方面更加灵活方便。然而在最近的使用中遇到了一点小麻烦:Spark保存文件的的函数(如saveAsTextFile)在保存数据时都需要新建一个目录,然后在这个目录下分块保存文件。如果我们想在原有的目录下增加一个文件(而不是增加一个目录),Spark就无能为力了。
有网友给出建议,用
把数据合并到一个分区中,然而得到的结果是这样的:
Spark仍然是新建了一个目录test.txt,然后在这个目录下把数据都保存在了part-00000文件中。
Spark的保存模式的设定注定了在保存数据的时候只能新建目录,如果想把数据增加到原有的目录中,单独作为一个文件,就只能借助于Hadoop的HDFS操作。下面的例子演示如何用Hadoop的FileSystem实现在已有目录下用一个文件保存Spark数据:
在Spark中处理并保存数据:
(PS:目前还没有看到过单用Spark接口能实现该功能,有知道的大神欢迎指点)
有网友给出建议,用
rddx.repartition(1).saveAsTextFile("test/test.txt") rddx.coalesce(1).saveAsTextFile("test/test.txt")
把数据合并到一个分区中,然而得到的结果是这样的:
$ ./bin/hadoop fs -du -h test/test.txt 0 test/test.txt/_SUCCESS 499.9 M test/test.txt/part-00000
Spark仍然是新建了一个目录test.txt,然后在这个目录下把数据都保存在了part-00000文件中。
Spark的保存模式的设定注定了在保存数据的时候只能新建目录,如果想把数据增加到原有的目录中,单独作为一个文件,就只能借助于Hadoop的HDFS操作。下面的例子演示如何用Hadoop的FileSystem实现在已有目录下用一个文件保存Spark数据:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.web.resources.ExceptionHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; import java.net.URI; /** * 使用Hadoop的FileSystem把数据写入到HDFS */ public class HdfsOperate implements Serializable{ private static Logger logger = LoggerFactory.getLogger(HdfsOperate.class); private static Configuration conf = new Configuration(); private static BufferedWriter writer = null; //在hdfs的目标位置新建一个文件,得到一个输出流 public static void openHdfsFile(String path) throws Exception { FileSystem fs = FileSystem.get(URI.create(path),conf); writer = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(path)))); if(null!=writer){ logger.info("[HdfsOperate]>> initialize writer succeed!"); } } //往hdfs文件中写入数据 public static void writeString(String line) { try { writer.write(line + "\n"); }catch(Exception e){ logger.error("[HdfsOperate]>> writer a line error:" , e); } } //关闭hdfs输出流 public static void closeHdfsFile() { try { if (null != writer) { writer.close(); logger.info("[HdfsOperate]>> closeHdfsFile close writer succeed!"); } else{ logger.error("[HdfsOperate]>> closeHdfsFile writer is null"); } }catch(Exception e){ logger.error("[HdfsOperate]>> closeHdfsFile close hdfs error:" + e); } } }
在Spark中处理并保存数据:
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import hdfsoperate.HdfsOperate; import org.apache.spark.Partition; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import util.NlpModuleWrapper; import java.io.Serializable; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Iterator; import java.util.List; /** * 调用HdfsOperate类的方法把RDD数据保存到Hdfs上 */ public class FeatureExtractor implements Serializable { private static Logger logger = LoggerFactory.getLogger(FeatureExtractor.class); public void extractFeature(JavaSparkContext sc, int repartitionNum) throws Exception { String hdfsPath = "test/corpus/2016-09-02"; //存放原始数据的文件 //Spark可以读取单独的一个文件或整个目录 JavaRDD<String> rddx = sc.textFile(hdfsPath).repartition(repartitionNum); rddx = rddx.map(new ExtractFeatureMap()); //写入hdfs文件位置 String destinationPath = "test/result/2016-09-02" ; //创建Hdfs文件,打开Hdfs输出流 HdfsOperate.openHdfsFile(destinationPath); //分块读取RDD数据并保存到hdfs //如果直接用collect()函数获取List<String>,可能因数据量过大超过内存空间而失败 for (int i = 0; i < repartitionNum; i++) { int[] index = new int[1]; index[0] = i; List<String>[] featureList = rddx.collectPartitions(index); if (featureList.length != 1) { logger.error("[FeatureExtractor]>> featureList.length is not 1!"); } for (String str : featureList[0]) { //写一行到Hdfs文件 HdfsOperate.writeString(str); } } //关闭Hdfs输出流 HdfsOperate.closeHdfsFile(); } class ExtractFeatureMap implements Function<String, String> { @Override public String call(String line) throws Exception { try { //TODO:你自己的操作,返回String类型 } catch (Exception e) { logger.error("[FeatureExtractor]>>GetTokenAndKeywordFeature error:", e); } return null; } } }
(PS:目前还没有看到过单用Spark接口能实现该功能,有知道的大神欢迎指点)
相关文章推荐
- 将数据库内表中的数据导出到txt文档中,并且显示一个对话框,提示用户保存文件的位置
- 在使用Spark Streaming向HDFS中保存数据时,文件内容会被覆盖掉的解决方案
- Linux中Matlab保存多个数据到同一个文件当中
- 【转载】spark读取HDFS文件和保存数据到HDFS
- 解码单个视频及保存yuv数据到文件中
- 利用正则表达式读取txt文件中的邮箱,电话号码,url地址,手机号,将数据一行一个保存到一个新的文件中去
- unserialize的这个问题是由一个emlog论坛用户在使用时报错而发现的 问题表现情况如下: emlog缓存的保存方式是将php的数据对象(数组)序列化(serialize)后以文件的形式存放,
- 利用c语言将一个文件中满足某条件的数据 保存到另外一个文件中
- spark-shell - 将结果保存成一个文件
- 面试问题2:给一个5G的大文件,保存的数据为32位的整型,找到所有出现次数超过两次的数字
- 补 上一个 spark导出hbase表信息的工具类(获取文件夹文件信息、删除文件夹、保存文件)
- 基于yaf框架和uploadify插件,做的一个导入excel文件,查看并保存数据的功能
- spark rdd saveAsTextFile保存为文件
- Spark RDD/DataFrame map保存数据的两种方式
- 数据满3条自动保存到一个文件里面
- Spark2加载保存文件,数据文件转换成数据框dataframe
- 基于yaf框架和uploadify插件,做的一个导入excel文件,查看并保存数据的功能
- Spark:将RDD[List[String,List[Person]]]中的List[Person]通过spark api保存为hdfs文件时一直出现not serializable task,没办法找到"spark自定义Kryo序列化输入输出API"
- 如何将WEB页面中的数据保存为一个word文件
- 用spark(spark-shell),从本地文件创建一个RDD