spark读取mongodb并解决循环嵌套array的拆分,属性不存在整个对象丢失问题。
2018-07-30 21:57
447 查看
1、创建SQLContext
SQLContext sqlContext = new SQLContext(sc)
2、拼接mongodb连接字符串
if(UserName!=null && !"".equals(UserName)){ if(PassWord!=null && !"".equals(PassWord)){ url="mongodb://"+UserName+":"+PassWord+"@"+IP+":"+Port+"/"+DataBase+"."+Table; }else{ url="mongodb://"+IP+":"+Port+"/"+DataBase+"."+Table; } }else{ url="mongodb://"+IP+":"+Port+"/"+DataBase+"."+Table; }
3、//定义并组装连接条件
Map<String,String> options=new HashMap<String,String>(); options.put("spark.mongodb.input.uri", url); options.put("spark.mongodb.input.partitioner" , "MongoPaginateBySizePartitioner"); options.put("spark.mongodb.input.partitionerOptions.partitionKey" , "_id"); options.put("spark.mongodb.input.partitionerOptions.partitionSizeMB",Partition);
4、读取所有数据数据
DataFrame df = sqlContext.read().format("com.mongodb.spark.sql").options( options).load(); df.printSchema();
5、循环schema并拆分数据。
for(StructField sf:df.schema().fields()){ this.li.add(sf.name()); sname=StringUtils.join(li.toArray(),"_"); if(sf.dataType().typeName()=="array"||"array".equals(sf.dataType().typeName())){ ArrayType at=(ArrayType)sf.dataType(); df=df.withColumn(sname, functions.explode(functions.when(df.col(sname).isNull(), functions.array(functions.lit(null).cast(at.elementType()))) .when(functions.size(df.col(sname)).equalTo(0), functions.array(functions.lit(null).cast(at.elementType()))) .otherwise(df.col(sname)))); df=array_loop((ArrayType)sf.dataType(),df); }else if(sf.dataType().typeName()=="struct"||"struct".equals(sf.dataType().typeName())){ df=struct_loop((StructType)sf.dataType(),df); df=df.drop(sname); } cols.add(StringUtils.join(li.toArray(),".")); this.li.remove(sf.name()); }
public DataFrame array_loop(ArrayType arr,DataFrame df){ if(arr.elementType().typeName()=="struct"||"struct".equals(arr.elementType().typeName())){ df=struct_loop((StructType)arr.elementType(),df); } df=df.drop(sname); return df; } public DataFrame struct_loop(StructType st,DataFrame df){ for(StructField sf:st.fields()){ this.li.add(sf.name()); String sname=StringUtils.join(li.toArray(),"_"); String sname1=StringUtils.join(li.toArray(),"."); if(sf.dataType().typeName()=="array"||"array".equals(sf.dataType().typeName())){ ArrayType at=(ArrayType)sf.dataType(); df=df.withColumn(sname, functions.explode(functions.when(df.col(sname).isNull(), functions.array(functions.lit(null).cast(at.elementType()))) .when(functions.size(df.col(sname)).equalTo(0), functions.array(functions.lit(null).cast(at.elementType()))) .otherwise(df.col(sname)))); df=array_loop((ArrayType)sf.dataType(),df); }else if(sf.dataType().typeName()=="struct"||"struct".equals(sf.dataType().typeName())){ df=struct_loop((StructType)sf.dataType(),df); } df=df.selectExpr(sname1+" as "+sname,"*"); cols.add(StringUtils.join(li.toArray(),".")); this.li.remove(sf.name()); } return df; }阅读更多
相关文章推荐
- NetBox环境下ajax用get获取数据,中文乱码的问题解决:对象不支持此属性或方法: 'Response.CharSet'
- GADL/OGR C# 读取Dxf数据时,Feature的Layer属性中文乱码问题的解决
- DataSet读取excel数据丢失问题解决
- (Java控制台程序版)递归打包整个父文件夹下的文件和子文件夹成压缩文件(*.ZIP)以及打包时常见的文件乱码和文件名乱码以及丢失文件或文件损坏问题解决办法
- 解决ADO读取Excel,数据丢失、数据错误、数据乱码问题
- php 中的重载技术——解决调用对象(或类)的未定义的属性或方法出错问题
- spark SQL读取ORC文件从Driver启动到开始执行Task(或stage)间隔时间太长(计算Partition时间太长)且产出orc单个文件中stripe个数太多问题解决方案
- 关于PowerDesigner出现不允许有扩展属性,或对象不存在的解决办法
- 关于PowerDesigner出现不允许有扩展属性,或对象不存在的解决办法
- BeanUtils主要解决 的问题: 把对象的属性数据封装 到对象中
- PowerDesigner16中的对象无效,不允许有扩展属性 问题的解决
- 解决spring-boot项目中无法读取yml配置文件属性问题
- 解决vue数组中对象属性变化页面不渲染问题
- 有关td使用colspan属性后导致td不能充满整个tr的问题解决方法
- 解决PPT中visio对象打印成PDF后文字丢失问题
- 【Java EE 学习 69 上】【struts2】【paramsPrepareParamsStack拦截器栈解决model对象和属性赋值冲突问题】
- 文章标题 spark读取文件过程中发现的问题解决记录
- BeanUtils主要解决 的问题: 把对象的属性数据封装 到对象中
- 解决webstorm调试js时Evaluate脚本获取dom属性丢失问题
- 农行动态口令卡问题解决方案(Key:Vista,IE7,证书已锁定,438对象不支持此属性或方法)