您的位置:首页 > 其它

Spark实现用户量增长实时统计

2018-03-12 16:41 337 查看
项目背景:公司产品希望在大屏上实时显示目前的用户数量
系统架构背景:业务系统在oracle上运行,之前的项目阶段已经将oracle中的数据通过Goldengate以json格式推送到kafka中。
数据流:
OGG--->Kafka--->Spark--->Redis---->Web

本文主要介绍Kafka至Redis部分:
初始化Redis中的用户量:   Jedis jedis = JedisUtil.getJedis();
String sql = "select count(*) from ogg.per_user";
Connection conn = JDBC.getConnection();
Statement stmt = conn.createStatement();
ResultSet res = stmt.executeQuery(sql);
while(res.next()) {
String CountNumber = res.getString(1);
jedis.set("sum", CountNumber);
}
JDBC.colseResource(conn, stmt, res);
主业务逻辑实现(统计数据插入与删除量): JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, zkQuorum, group,topicmap);

JavaDStream<String> lines = messages.flatMap(x -> Lists.newArrayList(x._2));

JavaPairDStream<String, Integer> wordMap = lines.mapToPair(x->{
JSONObject json = JSONObject.parseObject(x);
String op_type = json.getString("op_type");
if(op_type.equals("I")) {
return new Tuple2<String, Integer>("I", 1);
}
else if(op_type.equals("D")){
return new Tuple2<String, Integer>("D", 1);
}
else {
return null;
}
});
wordMap.foreachRDD(x->{
x.foreachPartition(p->{
Jedis jedis = JedisUtil.getJedis();
p.forEachRemaining(y->{
if(y._1().equals("I")) {
String s = MathUtil.bigNumberPlus(jedis.get("sum"),y._2().toString());
jedis.set("sum", s);
}
if(y._1().equals("D")) {
String s = MathUtil.bigNumberSub(jedis.get("sum"),y._2().toString());
jedis.set("sum", s);
}
});
});
});数学函数部分: public static String bigNumberPlus(String a, String b) {
int lenA = a.length();
int lenB = b.length();
if (lenA > lenB) {
b = StringUtils.leftPad(b, lenA, "0");
} else {
a = StringUtils.leftPad(a, lenB, "0");
}

int[] arrC = new int[a.length() + 1];

for (int i = a.length() - 1; i >= 0; i--) {
int ai = Integer.parseInt(a.charAt(i) + "");
int bi = Integer.parseInt(b.charAt(i) + "");
int ci = arrC[i + 1];
int t = ai + bi + ci;
arrC[i + 1] = t % 10;
arrC[i] = t / 10;
}

StringBuffer res = new StringBuffer();
for (int i = 0; i < arrC.length; i++) {
if (i == 0 && arrC[i] == 0)
continue;
res.append(arrC[i]);
}
return res.toString();
}

public static String bigNumberSub(String f, String s) {
// 将字符串翻转并转换成字符数组
char[] a = new StringBuffer(f).reverse().toString().toCharArray();
char[] b = new StringBuffer(s).reverse().toString().toCharArray();
int lenA = a.length;
int lenB = b.length;
// 找到最大长度
int len = lenA > lenB ? lenA : lenB;
int[] result = new int[len];
// 表示结果的正负
char sign = '+';
// 判断最终结果的正负
if (lenA < lenB) {
sign = '-';
} else if (lenA == lenB) {
int i = lenA - 1;
while (i > 0 && a[i] == b[i]) {
i--;
}
if (a[i] < b[i]) {
sign = '-';
}
}
// 计算结果集,如果最终结果为正,那么就a-b否则的话就b-a
for (int i = 0; i < len; i++) {
int aint = i < lenA ? (a[i] - '0') : 0;
int bint = i < lenB ? (b[i] - '0') : 0;
if (sign == '+') {
result[i] = aint - bint;
} else {
result[i] = bint - aint;
}
}
// 如果结果集合中的某一位小于零,那么就向前一位借一,然后将本位加上10。其实就相当于借位做减法
for (int i = 0; i < result.length - 1; i++) {
if (result[i] < 0) {
result[i + 1] -= 1;
result[i] += 10;
}
}

StringBuffer sb = new StringBuffer();
// 如果最终结果为负值,就将负号放在最前面,正号则不需要
if (sign == '-') {
sb.append('-');
}
// 判断是否有前置0
boolean flag = true;
for (int i = len - 1; i >= 0; i--) {
if (result[i] == 0 && flag) {
continue;
} else {
flag = false;
}
sb.append(result[i]);
}
// 如果最终结果集合中没有值,就说明是两值相等,最终返回0
if (sb.toString().equals("")) {
sb.append("0");
}
// 返回值
return sb.toString();
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: