您的位置:首页 > 大数据

大数据时代多表关联数据同步之SPARK实现(二)

2016-08-23 21:31 609 查看
前期博主有写过spark同步数据的博文,当时由于业务需求相对简单,只简单的实现了单表load功能。业务的发展驱动着技术的不段革新,猿猿们也在不断设计更加合理更加便捷更加优雅的业务模块,只是为了让你们用的爽用的简单~~~

背景

假设我们现在需要借助spark同步如下

select t.id as id ,t.title as title ,t.created as created,t.keywords as keywords,c.content as content,t.categoryid as categoryid,t.is_recommend as is_recommend
from tcrm_kms t ,tcrm_kms_con c where t.id = c.kms_id and t.isdelete = 0


数据以一定的业务格式需要同步至hdfs或者其他文件系统,该怎么做呢?

大家可以回忆下大数据时代数据同步之SPARK实现(一)中的内容,spark威武雄壮之处在于其1.0版本内提供了很友好的sqlcontext,dataset api,这也就意味着你只要会写sql,就可以很好的驾驭spark成为大数据开发工程师。

spark 1.0版本我们是这样写的:

val sparkConf = new SparkConf().setAppName("ant").setMaster("local[1]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)


spark在2.0版本提供了sparksession,sparksession其实是sqlcontext和hivecontext的组合,内部封装了soarkcontext,计算还是交由sparkcontext完成的。我们是这样写的:

val sparkSession = SparkSession.builder.master("loacl[1]").appname("ant").getOrCreate()


工程化

配置文件

<config>
<conf>
<sparkMaster>local[1]</sparkMaster>
<sparkJarsPath>jar地址</sparkJarsPath>
<sparkConf>spark.executor.memory,15G;</sparkConf>
<filePath>保存文件地址</filePath>
</conf>

<table>
<!--是否执行 0表示执行-->
<isAble>0</isAble>
<!--保存文件名称-->
<fileName></fileName>
<!--主表名称-->
<tableName>tcrm_kms</tableName>
<!--数据库用户名-->
<userName>test</userName>
<!--数据库密码-->
<passWord>test</passWord>
<!--数据库链接-->
<url>jdbc:mysql://localhost:3306/test</url>
<driver>com.mysql.jdbc.Driver</driver>
<!--需要返回的字段  * itemcode  itemcode as ic 多个逗号相隔   可以在as后面自定义快搜别名-->
<needFields>id,title,keywords,categoryid</needFields>
<!-- 保存文件数-->
<partitions>15</partitions>
<!--筛选条件 线程数等 多个逗号相隔-->
<prdeicates>isdelete = 0</prdeicates>
<slave>
<!--从表名称-->
<slaveTableName>tcrm_kms_con</slaveTableName>
<!--需要返回的字段  * itemcode  itemcode as ic 多个逗号相隔   可以在as后面自定义快搜别名-->
<slaveNeedFields>content</slaveNeedFields>
<!--筛选条件 多个逗号相隔-->
<slavePrdeicates></slavePrdeicates>
<!--join left join  right join-->
<joinType>join</joinType>
<combine>
<!--on之后条件  如   a = b 至少一个-->
<masterFiled>id</masterFiled>
<slaveFiled>kms_id</slaveFiled>
</combine>
</slave>
</table>
</config>


标签conf中保存基础配置信息,例如spark运行参数配置,格式为key,value;key,value

标签table中为一个运行job,如果是单表数据同步则忽略标签slave中信息,如果是多表join同步,则设置slave信息,主表写在tableName里,从表写在slaveTableName里,joinType字段包括

'inner', 'outer', 'full', 'fullouter','leftouter', 'left', 'rightouter', 'right', 'leftsemi'


sqlcontext支持的语法,标签combine只要是配置关键字on之后的等于条件。标签needFields支持 itemcode as ic这种写法,此时masterFiled需要写成ic而不是itemcode。如果是三表关联,只需要配置slave标签内容即可。

DTO

//基本配置信息
private ConfDO sparkConfDO;
//同步job
private List<JobDO> jobDOs;


ConfDO

//master
private String sparkMaster;
//jar包地址
private String sparkJarPath;
//spark配置信息
private Map<String, String> sparkConf;
//文件保存地址
private String filePath;


JobDO

//保存文件名  如果为空 则为tablename
private String fileName;
//同步表名
private String tableName;
//用户名
private String userName;
//密码
private String passWord;
//url
private String url;
//数据库驱动
private String driver;
//需要返回的字段
private String needFields;
// 返回文件数
private int partitions;
//是否执行
private String isAble;
//从表信息
private List<SlaveDO> slaveDOs;
//分区信息    筛选条件信息
private String[] prdeicates;


SlaveDO

//表名
private String tableName;
//返回字段
private String needFields;
//比较字段
private List<CombineDO> CombineDOs;
//筛选条件
private String[] prdeicates;
//关联类型
private String joinType;


CombineDO

//关联主字段
private String masterFiled;
//从字段
private String slaveFiled;


配置文件的基本格式就如上所述,后续有需要调整或者优化的就在此基础上进行二次开发即可,方便灵活,满足多表join查询数据同步的配置化工作。

采用saxhadler读取xml,这里需要注意的是sax对大文件的读取会出现数据丢失的现象(原理可以查阅相关资料),大家可以使用stringbuilder,stringbuffer等避免该问题的发生

@Override
public void characters(char[] ch, int start, int length) throws SAXException {
String string = new String(ch, start, length);
//        System.out.println(string.replace("\\n", "").trim());
sb.append(string);
}


spark java api实现

SQLContext sqlContext = new SQLContext(sc);
DataFrame master = sqlContext.read().jdbc(jobDO.getUrl(), jobDO.getTableName(), jobDO.getPrdeicates(), properties);
master.registerTempTable(MASTER);
if (!CheckUtil.checkListIsNull(jobDO.getSlaveDOs())) {
SlaveDO slaveDO = null;
String sql = "";
for (int i = 0; i < jobDO.getSlaveDOs().size(); i++) {
slaveDO = jobDO.getSlaveDOs().get(i);
if (StringUtils.isBlank(slaveDO.getTableName())) {
continue;
}
sqlContext.read().jdbc(jobDO.getUrl(), slaveDO.getTableName(), slaveDO.getPrdeicates(), properties).registerTempTable(SLAVE + i);
sql = LoadSql.bulidSql(jobDO.getNeedFields(), 0 == i ? MASTER : MASTER + i, slaveDO, SLAVE + i);
//System.out.println(sql);
if (StringUtils.isBlank(sql)) {
continue;
}
master = sqlContext.sql(sql);
master.registerTempTable(MASTER + (i + 1));
}
}


spark 1.6版本最多只支持两张表关联查询,如果多张表关联,则需要将任务进行分解,这也就是为啥会出现根据slave标签进行循环关联在注册表的过程,博主这里已经无力吐槽了~~~真不想吐槽了。。。

sql组装

public static String bulidSql(String masterFields, String masterTable, SlaveDO slaveDO, String slaveTable) {
String factor = createFactor(slaveDO.getCombineDOs());
if(StringUtils.isNotBlank(factor)){
return  SELECT + BLANK + createFileds(masterFields, MASTER) + COMMA + createFileds(slaveDO.getNeedFields(), SLAVE) + BLANK + FROM + BLANK + masterTable + BLANK + MASTER + BLANK + slaveDO.getJoinType() + BLANK + slaveTable + BLANK + SLAVE + BLANK  + factor;
}
return  factor;
}


额~~好吧,我自己也都觉得这sql也是无了奈了

ok,得到上述的RDD之后,就可以根据你的业务需要自定义输出啦,想输出啥格式就可以输出啥格式,如果需要schema同样可以。

总结

或许你已经发现了,竟然关联都能实现了,什么distinct,sort,group当然也没啥问题,这些只要对RDD进行二次处理即可。Spark给了我们程序猿一个很好的平台,尤其动态资源分配策略,虽然job启动初期是根据集群资源分配,这个有点扯犊子,但是在运行过程中的动态资源分配还是很给力的,所以博主在想能不能只job运行过程中在给spark任务分配资源呐?

现在开源社区有这大量的大数据相关的项目,希望有一天也能看到我的开源项目,突然好期待呀。

至此spark数据同步系列之二已经写完了,博主也准备收拾收拾回家整理屋子了,周末刚经历了一次搬家,对了,在这里万分的感谢博主媳妇及媳妇她爹周末的帮忙,啦啦啦~~
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  etl 大数据 spark 数据