您的位置:首页 > 数据库 > Redis

Flink之多维数据立方体的建立及自定义RedisSink

2020-04-04 07:13 1401 查看

文章目录

一、需求

有以下数据:

用户ID,活动ID,时间,事件类型,省份
u001,A1,2019-09-02 10:10:11,1,北京市
u001,A1,2019-09-02 14:10:11,1,北京市
u001,A1,2019-09-02 14:10:11,2,北京市
u002,A1,2019-09-02 14:10:11,1,北京市
u002,A2,2019-09-02 14:10:11,1,北京市
u002,A2,2019-09-02 15:10:11,1,北京市
u002,A2,2019-09-02 15:10:11,2,北京市
事件类型:
0:曝光
1:点击
2:参与

建立上述各个维度的数据立方体,并将统计的次数,写入到Redis中

二、代码实现

1、主线代码

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* @date: 2020/3/13 19:44
* @site: www.ianlou.cn
* @author: lekko 六水
* @qq: 496208110
* @description: 根据上面的活动,构造数据立方体,进行多维度分组
*/
public class ActivityCountWithMultiDimension {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//u001,A1,2019-09-02 10:10:11,1,北京市 数据
DataStreamSource<String> lines = env.socketTextStream("linux01", 8888);

// 一、数据的读取,做切割处理
SingleOutputStreamOperator<ActivityBean> beanDS =
lines.map(new MapFunction<String, ActivityBean>() {

@Override
public ActivityBean map(String lines) throws Exception {

String[] spArr = lines.split(",");
String uid = spArr[0];
String act = spArr[1];
String dt = spArr[2];
String type = spArr[3];
String province = spArr[4];

return ActivityBean.of(uid, act, dt, type, province);
}
});

// 二、根据条件进行聚合,生成多维数据立方体
SingleOutputStreamOperator<ActivityBean> res1 = beanDS.keyBy("aid", "type").sum("count");
SingleOutputStreamOperator<ActivityBean> res2 = beanDS.keyBy("aid", "type", "date").sum("count");
SingleOutputStreamOperator<ActivityBean> res3 = beanDS.keyBy("aid", "type", "date", "province").sum("count");

/**
*   三、将数据写入到自定义的Redis中
*      ① 将聚合中的事件类型type字段,作为里面的小key
*      ② 将除了事件类型type的其他字段,进行组合拼接成大key
*      ③ 将聚合的count次数,作为值 存储
*/

res1.map(new MapFunction<ActivityBean, Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> map(ActivityBean bean) throws Exception {
return Tuple3.of(Constant.ACTIVITY_COUNT + "-" + bean.getUid(), bean.getType(),
bean.getCount() + "");
}
}).addSink(new MySinkToRedis());

res2.map(new MapFunction<ActivityBean, Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> map(ActivityBean bean) throws Exception {
return Tuple3.of(Constant.DAILY_ACTIVITY_COUNT + "-" + bean.getUid() + "-" + bean.getDt(),
bean.getType(),
bean.getCount() + "");
}
}).addSink(new MySinkToRedis());

res3.map(new MapFunction<ActivityBean, Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> map(ActivityBean bean) throws Exception {
return Tuple3.of(Constant.PROVINCE_DAILY_ACTIVITY_COUNT + "-" + bean.getUid() + "-" + bean.getDt() +
"-" + bean.getProvince(), bean.getType(), bean.getCount() + "");
}
}).addSink(new MySinkToRedis());

env.execute("ActivityCountWithMultiDimension");
}
}

2、ActivityBean

当要切割处理的字段很多的时候,Tuple可能放不下,并且来回写数据类型也很麻烦。这样可以自定义一个JavaBean。

  • 自定义一个等于1的常量,用于后面的统计计数
  • 仿照Tuple,自定义一个of的静态方法
public class ActivityBean {
private String uid;
private String avtId;
private String dt;
private String type;
private String province;
//作为分组的时候,聚合计数
private long count = 1L;

public ActivityBean() {
}
public ActivityBean(String uid, String avtId, String dt, String type, String province) {
this.uid = uid;
this.avtId = avtId;
this.dt = dt;
this.type = type;
this.province = province;
}

//仿照Tuple 自定义一个静态的of方法
public static ActivityBean of(String uid, String avtId, String dt, String type, String province) {
return new ActivityBean(uid, avtId, dt, type, province);

}

@Override
public String toString() {
return "ActivityBean{" +
"uid='" + uid + '\'' +
", avtId='" + avtId + '\'' +
", dt='" + dt + '\'' +
", type='" + type + '\'' +
", province='" + province + '\'' +
", count=" + count +
'}';
}

public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public String getAvtId() {
return avtId;
}
public void setAvtId(String avtId) {
this.avtId = avtId;
}
public String getDt() {
return dt;
}
public void setDt(String dt) {
this.dt = dt;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
}

3、Constant 自定义的常量

public class Constant {
public static final String ACTIVITY_COUNT = "ACTIVITY_COUNT";
public static final String DAILY_ACTIVITY_COUNT = "DAILY_ACTIVITY_COUNT";
public static final String PROVINCE_DAILY_ACTIVITY_COUNT = "PROVINCE_DAILY_ACTIVITY_COUNT";
}

4、自定义的RedisSink

  • jedis就是集成了redis的一些命令操作,封装了redis的java客户端。提供了连接池管理。
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;

public class MySinkToRedis extends RichSinkFunction<Tuple3<String, String, String>> {

//定义Jedis,不让其参与序列化
private transient Jedis jedis;

// 创建Redis连接
@Override
public void open(Configuration parameters) throws Exception {
// 1、获取全局的配置文件
ParameterTool params = (ParameterTool) getRuntimeContext()
.getExecutionConfig()
.getGlobalJobParameters();

// 2、取出响应的配置信息
String host = params.getRequired("redis.host");
String password = params.getRequired("redis.password");
int port = params.getInt("redis.port", 6379);
int db = params.getInt("redis.db", 0);

// 3、建立Jedis连接
Jedis jedis = new Jedis(host, port);
jedis.auth(password);
jedis.select(db);
this.jedis = jedis;
}

// 向Redis中存放值
@Override
public void invoke(Tuple3<String, String, String> input, Context context) throws Exception {

if (!jedis.isConnected()) {
jedis.connect();
}
jedis.hset(input.f0, input.f1, input.f2);
}

@Override
public void close() throws Exception {
jedis.close();
}
}
  • 点赞
  • 收藏
  • 分享
  • 文章举报
IT_但丁 发布了29 篇原创文章 · 获赞 8 · 访问量 2433 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐