您的位置:首页 > 数据库 > Mongodb

spark读取mongodb并解决循环嵌套array的拆分,属性不存在整个对象丢失问题。

2018-07-30 21:57 447 查看

1、创建SQLContext

SQLContext sqlContext = new SQLContext(sc)

2、拼接mongodb连接字符串

if(UserName!=null && !"".equals(UserName)){
if(PassWord!=null && !"".equals(PassWord)){
url="mongodb://"+UserName+":"+PassWord+"@"+IP+":"+Port+"/"+DataBase+"."+Table;
}else{
url="mongodb://"+IP+":"+Port+"/"+DataBase+"."+Table;
}
}else{
url="mongodb://"+IP+":"+Port+"/"+DataBase+"."+Table;
}

3、//定义并组装连接条件

Map<String,String> options=new HashMap<String,String>();
options.put("spark.mongodb.input.uri", url);
options.put("spark.mongodb.input.partitioner" , "MongoPaginateBySizePartitioner");
options.put("spark.mongodb.input.partitionerOptions.partitionKey"  , "_id");
options.put("spark.mongodb.input.partitionerOptions.partitionSizeMB",Partition);

4、读取所有数据数据

DataFrame df = sqlContext.read().format("com.mongodb.spark.sql").options( options).load();

df.printSchema();

5、循环schema并拆分数据。

for(StructField sf:df.schema().fields()){
this.li.add(sf.name());
sname=StringUtils.join(li.toArray(),"_");
if(sf.dataType().typeName()=="array"||"array".equals(sf.dataType().typeName())){

ArrayType at=(ArrayType)sf.dataType();
df=df.withColumn(sname, functions.explode(functions.when(df.col(sname).isNull(), functions.array(functions.lit(null).cast(at.elementType())))
.when(functions.size(df.col(sname)).equalTo(0), functions.array(functions.lit(null).cast(at.elementType())))
.otherwise(df.col(sname))));
df=array_loop((ArrayType)sf.dataType(),df);
}else if(sf.dataType().typeName()=="struct"||"struct".equals(sf.dataType().typeName())){
df=struct_loop((StructType)sf.dataType(),df);
df=df.drop(sname);
}
cols.add(StringUtils.join(li.toArray(),"."));
this.li.remove(sf.name());
}
public DataFrame array_loop(ArrayType arr,DataFrame df){
if(arr.elementType().typeName()=="struct"||"struct".equals(arr.elementType().typeName())){
df=struct_loop((StructType)arr.elementType(),df);
}
df=df.drop(sname);
return df;
}
public DataFrame struct_loop(StructType st,DataFrame df){
for(StructField sf:st.fields()){
this.li.add(sf.name());
String sname=StringUtils.join(li.toArray(),"_");
String sname1=StringUtils.join(li.toArray(),".");
if(sf.dataType().typeName()=="array"||"array".equals(sf.dataType().typeName())){

ArrayType at=(ArrayType)sf.dataType();
df=df.withColumn(sname, functions.explode(functions.when(df.col(sname).isNull(), functions.array(functions.lit(null).cast(at.elementType())))
.when(functions.size(df.col(sname)).equalTo(0), functions.array(functions.lit(null).cast(at.elementType())))
.otherwise(df.col(sname))));
df=array_loop((ArrayType)sf.dataType(),df);
}else if(sf.dataType().typeName()=="struct"||"struct".equals(sf.dataType().typeName())){
df=struct_loop((StructType)sf.dataType(),df);
}
df=df.selectExpr(sname1+" as "+sname,"*");
cols.add(StringUtils.join(li.toArray(),"."));
this.li.remove(sf.name());
}
return df;
}
阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐