您的位置:首页 > 大数据

大数据实战【千亿级数仓】阶段二

2020-05-11 04:06 776 查看

写在前面: 博主是一名软件工程系大数据应用开发专业大二的学生,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,

写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新
。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站:http://alices.ibilibili.xyz/ , 博客主页:https://alice.blog.csdn.net/
尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为
一天的生活就是一生的缩影
。我希望
在最美的年华,做最好的自己

        本篇博客,博主为大家带来的是大数据实战【千亿级数仓】阶段二的内容。

        通过之前的预告,先来回顾一下我们需要掌握的技能。

  • 学习、掌握kettle的使用、使用kettle将项目需求所需的数据在MySQL同步到Hive。

  • 使用sqoop,将剩余的数据在MySQL同步到Hive。

        关于Kettle的具体使用情况,体贴的博主就不在这里赘述太多了,毕竟之前关于Kettle的使用说明的博客可花了不少心思。

        关于Kettle的详情,感兴趣的朋友可以进入👉Kettle专栏

        接下来讲的是,如何使用Kettle将项目所需要的数据从MySQL同步到Hive中。

        首先我们将快速在MySQL中创建好原始表的sql文件复制到DataGrip的新建文件夹下

        然后选中右键执行

        执行完毕,我们集群的MySQL下就会创建一个新的数据库itcast_shop,数据库下又会有诸多已经创建好的数据表

这些表正是在阶段一中提到的那八十多个表


然而,我们本次项目中真正用到的就只有这里面中的10个

        现在表全在MySQL中了,我们要做的就是使用Kettle将这10个表同步到Hive中。然后将剩下的表用Sqoop导入到Hive。

        这里肯定就有朋友要问了,为什么不全部都用Sqoop同步,还要分两种方式来同步数据,不是自找麻烦么?

        确实没错,但这里使用Kettle是为了让我们对kettle的使用更熟练,毕竟Kettle的功能有多强大,相信看过博主前面的介绍kettle博文的朋友都知道。

        因为使用Kettle导入10个表的数据到Hive,因此我们需要先在Hive中将这些数据表先创建出来。

        执行下面的建表语句

-- 创建ods层订单表
drop table if exists `itcast_ods`.`itcast_orders`;
create EXTERNAL table `itcast_ods`.`itcast_orders`(
orderId            bigint,
orderNo            string,
shopId             bigint,
userId             bigint,
orderStatus        bigint,
goodsMoney         double,
deliverType        bigint,
deliverMoney       double,
totalMoney         double,
realTotalMoney     double,
payType            bigint,
isPay              bigint,
areaId             bigint,
userAddressId      bigint,
areaIdPath         string,
userName           string,
userAddress        string,
userPhone          string,
orderScore         bigint,
isInvoice          bigint,
invoiceClient      string,
orderRemarks       string,
orderSrc           bigint,
needPay            double,
payRand            bigint,
orderType          bigint,
isRefund           bigint,
isAppraise         bigint,
cancelReason       bigint,
rejectReason       bigint,
rejectOtherReason  string,
isClosed           bigint,
goodsSearchKeys    string,
orderunique        string,
receiveTime        string,
deliveryTime       string,
tradeNo            string,
dataFlag           bigint,
createTime         string,
settlementId       bigint,
commissionFee      double,
scoreMoney         double,
useScore           bigint,
orderCode          string,
extraJson          string,
orderCodeTargetId  bigint,
noticeDeliver      bigint,
invoiceJson        string,
lockCashMoney      double,
payTime            string,
isBatch            bigint,
totalPayFee        bigint
)
partitioned by (dt string)
STORED AS PARQUET;

-- 创建ods层订单明细表
drop table if exists `itcast_ods`.`itcast_order_goods`;
create EXTERNAL table `itcast_ods`.`itcast_order_goods`(
ogId            bigint,
orderId         bigint,
goodsId         bigint,
goodsNum        bigint,
goodsPrice      double,
payPrice        double,
goodsSpecId     bigint,
goodsSpecNames  string,
goodsName       string,
goodsImg        string,
extraJson       string,
goodsType       bigint,
commissionRate  double,
goodsCode       string,
promotionJson   string,
createtime      string
)
partitioned by (dt string)
STORED AS PARQUET;

-- 创建ods层店铺表
drop table if exists `itcast_ods`.`itcast_shops`;
create EXTERNAL table `itcast_ods`.`itcast_shops`(
shopId             bigint,
shopSn             string,
userId             bigint,
areaIdPath         string,
areaId             bigint,
isSelf             bigint,
shopName           string,
shopkeeper         string,
telephone          string,
shopCompany        string,
shopImg            string,
shopTel            string,
shopQQ             string,
shopWangWang       string,
shopAddress        string,
bankId             bigint,
bankNo             string,
bankUserName       string,
isInvoice          bigint,
invoiceRemarks     string,
serviceStartTime   bigint,
serviceEndTime     bigint,
freight            bigint,
shopAtive          bigint,
shopStatus         bigint,
statusDesc         string,
dataFlag           bigint,
createTime         string,
shopMoney          double,
lockMoney          double,
noSettledOrderNum  bigint,
noSettledOrderFee  double,
paymentMoney       double,
bankAreaId         bigint,
bankAreaIdPath     string,
applyStatus        bigint,
applyDesc          string,
applyTime          string,
applyStep          bigint,
shopNotice         string,
rechargeMoney      double,
longitude          double,
latitude           double,
mapLevel           bigint,
BDcode             string,
modifyTime         string
)
partitioned by (dt string)
STORED AS PARQUET;

-- 创建ods层商品表
drop table if exists `itcast_ods`.`itcast_goods`;
create EXTERNAL table `itcast_ods`.`itcast_goods`(
goodsId              bigint,
goodsSn              string,
productNo            string,
goodsName            string,
goodsImg             string,
shopId               bigint,
goodsType            bigint,
marketPrice          double,
shopPrice            double,
warnStock            bigint,
goodsStock           bigint,
goodsUnit            string,
goodsTips            string,
isSale               bigint,
isBest               bigint,
isHot                bigint,
isNew                bigint,
isRecom              bigint,
goodsCatIdPath       string,
goodsCatId           bigint,
shopCatId1           bigint,
shopCatId2           bigint,
brandId              bigint,
goodsDesc            string,
goodsStatus          bigint,
saleNum              bigint,
saleTime             string,
visitNum             bigint,
appraiseNum          bigint,
isSpec               bigint,
gallery              string,
goodsSeoKeywords     string,
illegalRemarks       string,
dataFlag             bigint,
createTime           string,
isFreeShipping       bigint,
goodsSerachKeywords  string,
modifyTime           string
)
partitioned by (dt string)
STORED AS PARQUET;

-- 创建ods层组织机构表
drop table `itcast_ods`.`itcast_org`;
create EXTERNAL table `itcast_ods`.`itcast_org`(
orgId        bigint,
parentId     bigint,
orgName      string,
orgLevel     bigint,
managerCode  string,
isdelete     bigint,
createTime   string,
updateTime   string,
isShow       bigint,
orgType      bigint
)
partitioned by (dt string)
STORED AS PARQUET;

-- 创建ods层商品分类表
drop table if exists `itcast_ods`.`itcast_goods_cats`;
create EXTERNAL table `itcast_ods`.`itcast_goods_cats`(
catId               bigint,
parentId            bigint,
catName             string,
isShow              bigint,
isFloor             bigint,
catSort             bigint,
dataFlag            bigint,
createTime          string,
commissionRate      double,
catImg              string,
subTitle            string,
simpleName          string,
seoTitle            string,
seoKeywords         string,
seoDes              string,
catListTheme        string,
detailTheme         string,
mobileCatListTheme  string,
mobileDetailTheme   string,
wechatCatListTheme  string,
wechatDetailTheme   string,
cat_level           bigint
)
partitioned by (dt string)
STORED AS PARQUET;

-- 创建ods层用户表
drop table if exists `itcast_ods`.`itcast_users`;
create EXTERNAL table `itcast_ods`.`itcast_users`(
userId          bigint,
loginName       string,
loginSecret     bigint,
loginPwd        string,
userType        bigint,
userSex         bigint,
userName        string,
trueName        string,
brithday        string,
userPhoto       string,
userQQ          string,
userPhone       string,
userEmail       string,
userScore       bigint,
userTotalScore  bigint,
lastIP          string,
lastTime        string,
userFrom        bigint,
userMoney       double,
lockMoney       double,
userStatus      bigint,
dataFlag        bigint,
createTime      string,
payPwd          string,
rechargeMoney   double,
isInform        bigint
)
partitioned by (dt string)
STORED AS PARQUET;

-- 创建ods层退货表
drop table if exists `itcast_ods`.`itcast_order_refunds`;
create EXTERNAL table `itcast_ods`.`itcast_order_refunds`(
id                bigint,
orderId           bigint,
goodsId           bigint,
refundTo          bigint,
refundReson       bigint,
refundOtherReson  string,
backMoney         double,
refundTradeNo     string,
refundRemark      string,
refundTime        string,
shopRejectReason  string,
refundStatus      bigint,
createTime        string
)
partitioned by (dt string)
STORED AS PARQUET;

-- 创建ods层地址表
drop table if exists `itcast_ods`.`itcast_user_address`;
create EXTERNAL table `itcast_ods`.`itcast_user_address`(
addressId    bigint,
userId       bigint,
userName     string,
otherName    string,
userPhone    string,
areaIdPath   string,
areaId       bigint,
userAddress  string,
isDefault    bigint,
dataFlag     bigint,
createTime   string
)
partitioned by (dt string)
STORED AS PARQUET;

-- 创建ods层支付方式表
drop table if exists `itcast_ods`.`itcast_payments`;
create EXTERNAL table `itcast_ods`.`itcast_payments`(
id         bigint,
payCode    string,
payName    string,
payDesc    string,
payOrder   bigint,
payConfig  string,
enabled    bigint,
isOnline   bigint,
payFor     string
)
partitioned by (dt string)
STORED AS PARQUET;

执行完毕,此时数据库中就创建好了10个空的表

接下来我们就需要通过Kettle读取MySQL中的数据,输出到各个hive表存储在HDFS的路径下的parquent文件中即可。

相信看到这里的朋友,对于Kettle已经相当熟练了,所以我这里就不再像第一次教学Kettle那样细讲了。

我们根据需求,需要使用到表输入组件,字段选择(根据业务添加),parquent输出组件。

我们将所需要的组件连接起来,因为需要同时同步10个表的数据,所以我们也构造了10个"线路"

组件连接好了之后,让我们来看看如何单独设置每个的内容

首先双击空白处,我们需要设置一个kettle中的参数,方便我们调用,用来做数据分区使用


然后就可以进行设置表的输入了,需要注意的地方有如下四个

如果不放心,还可以选择预览数据

字段选择中,如果没有其他的特殊情况,我们这里默认就获取字段

然后我们就可以设置parquent文件的输出了

需要注意位置要设置成HDFS,然后在预览中选择需要导入Hive表在HDFS上的元数据的路径。
另外建议勾选上,覆盖已存在文件,这样我们就反复运行程序而无需担心每次都要换个输出路径了~
默认也都是获取所有的字段,然后我们就可以设置压缩格式

Snappy
,就可以点击确定了。

上面演示的是一个表从MySQL读取到输出Parquent的过程,因为这里我们涉及到了十个表,所以需要操作十次…

待到10个表的流程都完成,直接运行然后在命令行上修复分区数据也是一样的

但是都操作到这里了,我们还是换一种

优雅
的方式

首先我们新建一个

作业


作业
界面,我们获取到这些组件,并连接起来

Start 我们无需操作,后面挂的小锁代表着无需任何条件即可执行
关于转换组件的设置,是一个重点
这里的路径需要设置成我们前面已经创建的转换文件在本地的路径

接着就在SQL组件中,连接上hive,并编写需要执行的SQL脚本

待到设置完毕,我们就可以运行这个Job了

正常情况下,我们可以在执行完毕之后,查询之前创建的Hive数据表,可以发现10张表都已经有了数据

        Kettle如何实现MySQL同步到Hive已经说完了。下面我们来整点“刺激”的!

        关于全量导入mysql表数据到Hive,有以下两种方法:

        首先,进入到Sqoop的安装目录下,

cd /export/servers/sqoop-1.4.6.bin__hadoop-2.0.4-alpha

方式一:先复制表结构到hive中再导入数据

        将关系型数据的表结构复制到hive中

bin/sqoop create-hive-table \
--connect jdbc:mysql://节点IP:3306/mysql数据库 \
--table mysql数据表名 \
--username mysql账户 \
--password mysql密码 \
--hive-table 数据库.需要输出的表名

        从关系数据库导入文件到hive中

bin/sqoop import \
--connect jdbc:mysql://节点IP:3306/mysql数据库\
--username mysql账户 \
--password mysql密码 \
--table mysql数据表 \
--hive-table 数据库.需要输出的表名 \
--hive-import \
--m 1

方式二:直接复制表结构数据到hive中

bin/sqoop import \
--connect jdbc:mysql://节点IP:3306/mysql数据库\
--username mysql账户 \
--password mysql密码 \
--table mysql数据表 \
--hive-import \
--m 1 \
--hive-database hive数据库;

如果用方式二想把数据导出到分区表,可以用下面这种方式

sqoop import \
--connect jdbc:mysql://IP节点:3306/mysql数据库 \
--username mysql账户 \
--password mysql密码 \
--where "查询条件" \
--target-dir /user/hive/warehouse/xxx输出路径/ \
--table mysql数据表--m 1

注意:

        用这两种方法都可以实现从MySQL同步数据到Hive,区别就在于方式二输出的hive表名与mysql输入的表名是一样的,方式一可以自己定义hive输出表的名字

小结

        大数据实战【千亿级数仓】阶段二需要大家熟练Kettle的基本使用,项目所需数据的从MySQL到Hive同步以及使用Sqoop同步其他数据。

        如果以上过程中出现了任何的纰漏错误,烦请大佬们指正😅

        受益的朋友或对大数据技术感兴趣的伙伴记得点赞关注支持一波🙏

Alice菌 原创文章 278获赞 2910访问量 56万+ 关注 私信
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐