大数据时代多表关联数据同步之SPARK实现(二)
2016-08-23 21:31
609 查看
前期博主有写过spark同步数据的博文,当时由于业务需求相对简单,只简单的实现了单表load功能。业务的发展驱动着技术的不段革新,猿猿们也在不断设计更加合理更加便捷更加优雅的业务模块,只是为了让你们用的爽用的简单~~~
数据以一定的业务格式需要同步至hdfs或者其他文件系统,该怎么做呢?
大家可以回忆下大数据时代数据同步之SPARK实现(一)中的内容,spark威武雄壮之处在于其1.0版本内提供了很友好的sqlcontext,dataset api,这也就意味着你只要会写sql,就可以很好的驾驭spark成为大数据开发工程师。
spark 1.0版本我们是这样写的:
spark在2.0版本提供了sparksession,sparksession其实是sqlcontext和hivecontext的组合,内部封装了soarkcontext,计算还是交由sparkcontext完成的。我们是这样写的:
标签conf中保存基础配置信息,例如spark运行参数配置,格式为key,value;key,value
标签table中为一个运行job,如果是单表数据同步则忽略标签slave中信息,如果是多表join同步,则设置slave信息,主表写在tableName里,从表写在slaveTableName里,joinType字段包括
sqlcontext支持的语法,标签combine只要是配置关键字on之后的等于条件。标签needFields支持 itemcode as ic这种写法,此时masterFiled需要写成ic而不是itemcode。如果是三表关联,只需要配置slave标签内容即可。
DTO
ConfDO
JobDO
SlaveDO
CombineDO
配置文件的基本格式就如上所述,后续有需要调整或者优化的就在此基础上进行二次开发即可,方便灵活,满足多表join查询数据同步的配置化工作。
采用saxhadler读取xml,这里需要注意的是sax对大文件的读取会出现数据丢失的现象(原理可以查阅相关资料),大家可以使用stringbuilder,stringbuffer等避免该问题的发生
spark java api实现
spark 1.6版本最多只支持两张表关联查询,如果多张表关联,则需要将任务进行分解,这也就是为啥会出现根据slave标签进行循环关联在注册表的过程,博主这里已经无力吐槽了~~~真不想吐槽了。。。
sql组装
额~~好吧,我自己也都觉得这sql也是无了奈了
ok,得到上述的RDD之后,就可以根据你的业务需要自定义输出啦,想输出啥格式就可以输出啥格式,如果需要schema同样可以。
现在开源社区有这大量的大数据相关的项目,希望有一天也能看到我的开源项目,突然好期待呀。
至此spark数据同步系列之二已经写完了,博主也准备收拾收拾回家整理屋子了,周末刚经历了一次搬家,对了,在这里万分的感谢博主媳妇及媳妇她爹周末的帮忙,啦啦啦~~
背景
假设我们现在需要借助spark同步如下select t.id as id ,t.title as title ,t.created as created,t.keywords as keywords,c.content as content,t.categoryid as categoryid,t.is_recommend as is_recommend from tcrm_kms t ,tcrm_kms_con c where t.id = c.kms_id and t.isdelete = 0
数据以一定的业务格式需要同步至hdfs或者其他文件系统,该怎么做呢?
大家可以回忆下大数据时代数据同步之SPARK实现(一)中的内容,spark威武雄壮之处在于其1.0版本内提供了很友好的sqlcontext,dataset api,这也就意味着你只要会写sql,就可以很好的驾驭spark成为大数据开发工程师。
spark 1.0版本我们是这样写的:
val sparkConf = new SparkConf().setAppName("ant").setMaster("local[1]") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc)
spark在2.0版本提供了sparksession,sparksession其实是sqlcontext和hivecontext的组合,内部封装了soarkcontext,计算还是交由sparkcontext完成的。我们是这样写的:
val sparkSession = SparkSession.builder.master("loacl[1]").appname("ant").getOrCreate()
工程化
配置文件<config> <conf> <sparkMaster>local[1]</sparkMaster> <sparkJarsPath>jar地址</sparkJarsPath> <sparkConf>spark.executor.memory,15G;</sparkConf> <filePath>保存文件地址</filePath> </conf> <table> <!--是否执行 0表示执行--> <isAble>0</isAble> <!--保存文件名称--> <fileName></fileName> <!--主表名称--> <tableName>tcrm_kms</tableName> <!--数据库用户名--> <userName>test</userName> <!--数据库密码--> <passWord>test</passWord> <!--数据库链接--> <url>jdbc:mysql://localhost:3306/test</url> <driver>com.mysql.jdbc.Driver</driver> <!--需要返回的字段 * itemcode itemcode as ic 多个逗号相隔 可以在as后面自定义快搜别名--> <needFields>id,title,keywords,categoryid</needFields> <!-- 保存文件数--> <partitions>15</partitions> <!--筛选条件 线程数等 多个逗号相隔--> <prdeicates>isdelete = 0</prdeicates> <slave> <!--从表名称--> <slaveTableName>tcrm_kms_con</slaveTableName> <!--需要返回的字段 * itemcode itemcode as ic 多个逗号相隔 可以在as后面自定义快搜别名--> <slaveNeedFields>content</slaveNeedFields> <!--筛选条件 多个逗号相隔--> <slavePrdeicates></slavePrdeicates> <!--join left join right join--> <joinType>join</joinType> <combine> <!--on之后条件 如 a = b 至少一个--> <masterFiled>id</masterFiled> <slaveFiled>kms_id</slaveFiled> </combine> </slave> </table> </config>
标签conf中保存基础配置信息,例如spark运行参数配置,格式为key,value;key,value
标签table中为一个运行job,如果是单表数据同步则忽略标签slave中信息,如果是多表join同步,则设置slave信息,主表写在tableName里,从表写在slaveTableName里,joinType字段包括
'inner', 'outer', 'full', 'fullouter','leftouter', 'left', 'rightouter', 'right', 'leftsemi'
sqlcontext支持的语法,标签combine只要是配置关键字on之后的等于条件。标签needFields支持 itemcode as ic这种写法,此时masterFiled需要写成ic而不是itemcode。如果是三表关联,只需要配置slave标签内容即可。
DTO
//基本配置信息 private ConfDO sparkConfDO; //同步job private List<JobDO> jobDOs;
ConfDO
//master private String sparkMaster; //jar包地址 private String sparkJarPath; //spark配置信息 private Map<String, String> sparkConf; //文件保存地址 private String filePath;
JobDO
//保存文件名 如果为空 则为tablename private String fileName; //同步表名 private String tableName; //用户名 private String userName; //密码 private String passWord; //url private String url; //数据库驱动 private String driver; //需要返回的字段 private String needFields; // 返回文件数 private int partitions; //是否执行 private String isAble; //从表信息 private List<SlaveDO> slaveDOs; //分区信息 筛选条件信息 private String[] prdeicates;
SlaveDO
//表名 private String tableName; //返回字段 private String needFields; //比较字段 private List<CombineDO> CombineDOs; //筛选条件 private String[] prdeicates; //关联类型 private String joinType;
CombineDO
//关联主字段 private String masterFiled; //从字段 private String slaveFiled;
配置文件的基本格式就如上所述,后续有需要调整或者优化的就在此基础上进行二次开发即可,方便灵活,满足多表join查询数据同步的配置化工作。
采用saxhadler读取xml,这里需要注意的是sax对大文件的读取会出现数据丢失的现象(原理可以查阅相关资料),大家可以使用stringbuilder,stringbuffer等避免该问题的发生
@Override public void characters(char[] ch, int start, int length) throws SAXException { String string = new String(ch, start, length); // System.out.println(string.replace("\\n", "").trim()); sb.append(string); }
spark java api实现
SQLContext sqlContext = new SQLContext(sc); DataFrame master = sqlContext.read().jdbc(jobDO.getUrl(), jobDO.getTableName(), jobDO.getPrdeicates(), properties); master.registerTempTable(MASTER); if (!CheckUtil.checkListIsNull(jobDO.getSlaveDOs())) { SlaveDO slaveDO = null; String sql = ""; for (int i = 0; i < jobDO.getSlaveDOs().size(); i++) { slaveDO = jobDO.getSlaveDOs().get(i); if (StringUtils.isBlank(slaveDO.getTableName())) { continue; } sqlContext.read().jdbc(jobDO.getUrl(), slaveDO.getTableName(), slaveDO.getPrdeicates(), properties).registerTempTable(SLAVE + i); sql = LoadSql.bulidSql(jobDO.getNeedFields(), 0 == i ? MASTER : MASTER + i, slaveDO, SLAVE + i); //System.out.println(sql); if (StringUtils.isBlank(sql)) { continue; } master = sqlContext.sql(sql); master.registerTempTable(MASTER + (i + 1)); } }
spark 1.6版本最多只支持两张表关联查询,如果多张表关联,则需要将任务进行分解,这也就是为啥会出现根据slave标签进行循环关联在注册表的过程,博主这里已经无力吐槽了~~~真不想吐槽了。。。
sql组装
public static String bulidSql(String masterFields, String masterTable, SlaveDO slaveDO, String slaveTable) { String factor = createFactor(slaveDO.getCombineDOs()); if(StringUtils.isNotBlank(factor)){ return SELECT + BLANK + createFileds(masterFields, MASTER) + COMMA + createFileds(slaveDO.getNeedFields(), SLAVE) + BLANK + FROM + BLANK + masterTable + BLANK + MASTER + BLANK + slaveDO.getJoinType() + BLANK + slaveTable + BLANK + SLAVE + BLANK + factor; } return factor; }
额~~好吧,我自己也都觉得这sql也是无了奈了
ok,得到上述的RDD之后,就可以根据你的业务需要自定义输出啦,想输出啥格式就可以输出啥格式,如果需要schema同样可以。
总结
或许你已经发现了,竟然关联都能实现了,什么distinct,sort,group当然也没啥问题,这些只要对RDD进行二次处理即可。Spark给了我们程序猿一个很好的平台,尤其动态资源分配策略,虽然job启动初期是根据集群资源分配,这个有点扯犊子,但是在运行过程中的动态资源分配还是很给力的,所以博主在想能不能只job运行过程中在给spark任务分配资源呐?现在开源社区有这大量的大数据相关的项目,希望有一天也能看到我的开源项目,突然好期待呀。
至此spark数据同步系列之二已经写完了,博主也准备收拾收拾回家整理屋子了,周末刚经历了一次搬家,对了,在这里万分的感谢博主媳妇及媳妇她爹周末的帮忙,啦啦啦~~
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- 我是运营,我没有假期
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- 康诺云推出三款智能硬件产品,为健康管理业务搭建数据池
- DB2数据库的安装
- C#实现把指定数据写入串口
- “传奇”图象数据存储方式
- 修复mysql数据库
- 浅析SQL数据操作语句
- SQLServer 数据导入导出的几种方法小结
- 简述MySQL分片中快速数据迁移
- MySQL数据备份之mysqldump的使用详解
- C#实现窗体间传递数据实例
- C#中的委托数据类型简介
- SQL Server删除表及删除表中数据的方法
- SqlServer2008误操作数据(delete或者update)后恢复数据的方法
- 给你的数据库文件减肥
- Oracle数据更改后出错的解决方法