重要|flink的时间及时区问题解决
重要|flink的时间及时区问题解决
浪院长 浪尖聊大数据
1.时间纪元
所谓的”时间纪元”就是1970年1月1日0时0分0秒,指的是开始的时间。比如Java类代码:
Date date = new Date(0);
System.out.println(date);
打印出来的结果:
Thu Jan 01 08:00:00 CST 1970
也是1970年1月1日,实际上时分秒是0点0分0秒,这里打印出来的时间是8点而非0点,原因是存在系统时间和本地时间的问题,其实系统时间依然是0点,只不过我们的电脑时区设置为东8区,故打印的结果是8点。
只需要将时区设置为GMT+0,即可打印出0点0分0秒
System.setProperty("user.timezone","GMT+0");
实际上时区问题都是在此时间纪元基础上加/减一定的offset。
2.Flink时间
说java纪元跟本文将的flink时间问题有啥关系呢?
Flink在使用时间的这个概念的时候就是基于时间纪元这个概念的。比如首先,我们的时区是东八区,在我们的视野中UTC-0时间应该加8小时的offset,才是我们看到的时间,所以在使用flink的窗口的时候往往比我们当前的时间少8小时。
还有flink的窗口对其,也是基于纪元时间的。比如下面的有三个窗口函数的例子
1).5min滚动窗口
14:16:391启动的窗口,滚动窗口时间是5min,会发现并不是等待五分钟之后才有结果输出,而是到了14:20:00.0的时候就直接输出结果了。
2).30min滚动窗口
14:27:11启动的滚动窗口,是在14:30:00的时候就直接输出了,而不是等待半小时。
3).1hour滚动窗口
15:54:48启动的一小时的滚动窗口,输出时间是16点整。
时间上差了八小时,但是对齐是基于时间纪元的整数单位。
3.解决差八小时问题
实际在使用的时候flink输出的时差很令人反感,但是没办法flink目前不支持配置时区,但是blink支持,等待着合并吧。
其实,时区问题解决方案比较多吧,要想不伤筋动骨,主要介绍以下三种:
-
flink端不做处理。也即是在读取数据的时候加上8小时的offset。
-
使用udf等算子给时间戳加上8小时的offset。
- sink内部做处理。
1).Udf实现
sink端处理
import org.apache.flink.table.functions.ScalarFunction; import java.sql.Timestamp; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimeZone; public class UTC2Local extends ScalarFunction { public Timestamp eval(Timestamp s) { long timestamp = s.getTime() + 28800000; return new Timestamp(timestamp); } }
注册udf
tEnv.registerFunction("utc2local",new UTC2Local());
使用udf
Table table1 = tEnv.sqlQuery("select count(number),utc2local(TUMBLE_END(proctime, INTERVAL '1' HOUR)) from res group by TUMBLE(proctime, INTERVAL '1' HOUR)");
2). sink内部支持
sink端的实现也比较简单,主要是判断输出字段类型,然后加上8小时offset即可。可以参考blink的printtablesink的实现。
override def invoke(in: JTuple2[JBool, Row]): Unit = { val sb = new StringBuilder val row = in.f1 for (i <- 0 to row.getArity - 1) { if (i > 0) sb.append(",") val f = row.getField(i) if (f.isInstanceOf[Date]) { sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "yyyy-MM-dd", tz)) } else if (f.isInstanceOf[Time]) { sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "HH:mm:ss", tz)) } else if (f.isInstanceOf[Timestamp]) { sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "yyyy-MM-dd HH:mm:ss.SSS", tz)) } else { sb.append(StringUtils.arrayAwareToString(f)) } } if (in.f0) { System.out.println(prefix + "(+)" + sb.toString()) } else { System.out.println(prefix + "(-)" + sb.toString()) } }
- 解决Docker容器时区及时间不同步问题
- 两种方法解决:mybatis查询mysql的datetime类型数据时间差了8小时(时区问题)
- Linux时间与Windows差8个时区的问题解决方法
- 解决Docker容器时区及时间不同步的问题
- 你应当如何学习C++以及编程(细节是必要的,但不是重要的,把时间用在集中精力去解决问题,而不是学习新技术,那样练不成高手。在实践中提高才是最重要的。最最重要的内功还是长期学习所磨练出来的自学能力)good
- 解决Docker容器时区及时间不同步问题的方法
- php解决时间不正确问题,设置中国时区
- 解决Flink输出日志中时间比当前时间晚8个小时的问题
- 解决ubuntu设置时区的问题(系统安装好后出现时间相差16小时)
- <Power Shell>12 解决本地工作站或者远程计算机时间和时区的问题
- Linux 下Tomcat容器启动设置时区 解决tomcat时间统一的问题 总是慢8小时
- linux环境下时区无法设置(系统时间慢8个小时)的问题解决
- 关于xxl-job两台机器时间或者时区不同的问题解决。
- MTK 时间 时区问题解决方案
- PHP的php.ini时区设置问题 解决时间相差8小时问题
- 解决Flink输出日志中时间比当前时间晚8个小时的问题
- Django时区设置, 解决mysql存入时间偏差问题
- Windows没有北京时间(GMT+8时区)的问题,及QQ聊天栏中显示的时间错误的问题解决办法
- PHP获取当前时间戳,当前时间、及解决时区问题
- java 解决时间差8个小时的问题、Linux系统中时区的修改及tomcat时区的修改