Flink之多维数据立方体的建立及自定义RedisSink
2020-04-04 07:13
1401 查看
文章目录
一、需求
有以下数据:
用户ID,活动ID,时间,事件类型,省份 u001,A1,2019-09-02 10:10:11,1,北京市 u001,A1,2019-09-02 14:10:11,1,北京市 u001,A1,2019-09-02 14:10:11,2,北京市 u002,A1,2019-09-02 14:10:11,1,北京市 u002,A2,2019-09-02 14:10:11,1,北京市 u002,A2,2019-09-02 15:10:11,1,北京市 u002,A2,2019-09-02 15:10:11,2,北京市
事件类型: 0:曝光 1:点击 2:参与
建立上述各个维度的数据立方体,并将统计的次数,写入到Redis中
二、代码实现
1、主线代码
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @date: 2020/3/13 19:44 * @site: www.ianlou.cn * @author: lekko 六水 * @qq: 496208110 * @description: 根据上面的活动,构造数据立方体,进行多维度分组 */ public class ActivityCountWithMultiDimension { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //u001,A1,2019-09-02 10:10:11,1,北京市 数据 DataStreamSource<String> lines = env.socketTextStream("linux01", 8888); // 一、数据的读取,做切割处理 SingleOutputStreamOperator<ActivityBean> beanDS = lines.map(new MapFunction<String, ActivityBean>() { @Override public ActivityBean map(String lines) throws Exception { String[] spArr = lines.split(","); String uid = spArr[0]; String act = spArr[1]; String dt = spArr[2]; String type = spArr[3]; String province = spArr[4]; return ActivityBean.of(uid, act, dt, type, province); } }); // 二、根据条件进行聚合,生成多维数据立方体 SingleOutputStreamOperator<ActivityBean> res1 = beanDS.keyBy("aid", "type").sum("count"); SingleOutputStreamOperator<ActivityBean> res2 = beanDS.keyBy("aid", "type", "date").sum("count"); SingleOutputStreamOperator<ActivityBean> res3 = beanDS.keyBy("aid", "type", "date", "province").sum("count"); /** * 三、将数据写入到自定义的Redis中 * ① 将聚合中的事件类型type字段,作为里面的小key * ② 将除了事件类型type的其他字段,进行组合拼接成大key * ③ 将聚合的count次数,作为值 存储 */ res1.map(new MapFunction<ActivityBean, Tuple3<String, String, String>>() { @Override public Tuple3<String, String, String> map(ActivityBean bean) throws Exception { return Tuple3.of(Constant.ACTIVITY_COUNT + "-" + bean.getUid(), bean.getType(), bean.getCount() + ""); } }).addSink(new MySinkToRedis()); res2.map(new MapFunction<ActivityBean, Tuple3<String, String, String>>() { @Override public Tuple3<String, String, String> map(ActivityBean bean) throws Exception { return Tuple3.of(Constant.DAILY_ACTIVITY_COUNT + "-" + bean.getUid() + "-" + bean.getDt(), bean.getType(), bean.getCount() + ""); } }).addSink(new MySinkToRedis()); res3.map(new MapFunction<ActivityBean, Tuple3<String, String, String>>() { @Override public Tuple3<String, String, String> map(ActivityBean bean) throws Exception { return Tuple3.of(Constant.PROVINCE_DAILY_ACTIVITY_COUNT + "-" + bean.getUid() + "-" + bean.getDt() + "-" + bean.getProvince(), bean.getType(), bean.getCount() + ""); } }).addSink(new MySinkToRedis()); env.execute("ActivityCountWithMultiDimension"); } }
2、ActivityBean
当要切割处理的字段很多的时候,Tuple可能放不下,并且来回写数据类型也很麻烦。这样可以自定义一个JavaBean。
- 自定义一个等于1的常量,用于后面的统计计数
- 仿照Tuple,自定义一个of的静态方法
public class ActivityBean { private String uid; private String avtId; private String dt; private String type; private String province; //作为分组的时候,聚合计数 private long count = 1L; public ActivityBean() { } public ActivityBean(String uid, String avtId, String dt, String type, String province) { this.uid = uid; this.avtId = avtId; this.dt = dt; this.type = type; this.province = province; } //仿照Tuple 自定义一个静态的of方法 public static ActivityBean of(String uid, String avtId, String dt, String type, String province) { return new ActivityBean(uid, avtId, dt, type, province); } @Override public String toString() { return "ActivityBean{" + "uid='" + uid + '\'' + ", avtId='" + avtId + '\'' + ", dt='" + dt + '\'' + ", type='" + type + '\'' + ", province='" + province + '\'' + ", count=" + count + '}'; } public String getUid() { return uid; } public void setUid(String uid) { this.uid = uid; } public String getAvtId() { return avtId; } public void setAvtId(String avtId) { this.avtId = avtId; } public String getDt() { return dt; } public void setDt(String dt) { this.dt = dt; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getProvince() { return province; } public void setProvince(String province) { this.province = province; } public long getCount() { return count; } public void setCount(long count) { this.count = count; } }
3、Constant 自定义的常量
public class Constant { public static final String ACTIVITY_COUNT = "ACTIVITY_COUNT"; public static final String DAILY_ACTIVITY_COUNT = "DAILY_ACTIVITY_COUNT"; public static final String PROVINCE_DAILY_ACTIVITY_COUNT = "PROVINCE_DAILY_ACTIVITY_COUNT"; }
4、自定义的RedisSink
- jedis就是集成了redis的一些命令操作,封装了redis的java客户端。提供了连接池管理。
import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import redis.clients.jedis.Jedis; public class MySinkToRedis extends RichSinkFunction<Tuple3<String, String, String>> { //定义Jedis,不让其参与序列化 private transient Jedis jedis; // 创建Redis连接 @Override public void open(Configuration parameters) throws Exception { // 1、获取全局的配置文件 ParameterTool params = (ParameterTool) getRuntimeContext() .getExecutionConfig() .getGlobalJobParameters(); // 2、取出响应的配置信息 String host = params.getRequired("redis.host"); String password = params.getRequired("redis.password"); int port = params.getInt("redis.port", 6379); int db = params.getInt("redis.db", 0); // 3、建立Jedis连接 Jedis jedis = new Jedis(host, port); jedis.auth(password); jedis.select(db); this.jedis = jedis; } // 向Redis中存放值 @Override public void invoke(Tuple3<String, String, String> input, Context context) throws Exception { if (!jedis.isConnected()) { jedis.connect(); } jedis.hset(input.f0, input.f1, input.f2); } @Override public void close() throws Exception { jedis.close(); } }
- 点赞
- 收藏
- 分享
- 文章举报
相关文章推荐
- ABAP--如何建立通过sap表维护工具来维护自定义表TCODE
- Building Custom Components/建立自定义组件
- 微软企业库5.0 学习之路——第九步、使用PolicyInjection模块进行AOP—PART4——建立自定义Call Handler实现用户操作日志记录
- 通过sap的表维护工具生成维护代码并建立自定义的TCODE
- hbase建立自定义endpoint协处理,结果还是没有成功
- PHP开发小技巧②⑤—多维数组多字段自定义排序
- 使用分析服务多维模式建立简单的分析模型
- 在mojoportal中建立自定义模块
- php多维数组按用户自定义顺序排序uasort()
- c#多维数组的建立及操作
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战010-DataStream与MySql自定义sink和source(Java版)002
- C#综合揭秘——通过修改注册表建立Windows自定义协议
- 【Android】自定义View快速建立字母索引列表
- 编写电子词典(含自定义头文件和链表的建立)
- Java自定义实现Comparable接口的类,实现多维排序
- 云星数据---Apache Flink实战系列(精品版)】:Flink流处理API详解与编程实战011-DataStream与MySql自定义sink和source(Java版)003
- WebGL three.js学习笔记 自定义顶点建立几何体
- 四、Laravel建立自定义公共函数
- 在Flex 4中建立一个自定义Halo Accordion header的皮肤
- iOS开发之仿微博视频边下边播之自定义AVPlayer播放器, 边下边播解剖。视频处理流程,建立连接-请求数据-统筹数据-解码数据-视频呈现