记录一下互联网日志实时收集和实时计算的简单方案
2016-11-21 09:38
507 查看
作为互联网公司,网站监测日志当然是数据的最大来源。我们目前的规模也不大,每天的日志量大约1TB。后续90%以上的业务都是需要基于日志来完成,之前,业务中对实时的要求并不高,最多也就是准实时(延迟半小时以上),因此,我们使用Flume将数据收集到HDFS,然后进行清洗和分析。
后来,根据业务需要,我们有了两个Hadoop集群,并且部署在不同的地方(北京和西安),而所有的日志收集服务器在北京,因此需要将日志数据通过外网传输到西安,于是有了这样的部署:
很快,通过Flume流到西安Hadoop集群的数据就遇到了问题,比原始数据多或者少一些,造成这个问题的主要原因是在网络不稳定的情况下,北京Flume Agent发送到西安Flume Collector的过程中,会发送失败,或者响应失败。另外,之前的数据准实时也不能满足业务的需求。
为了解决数据实时跨外网传输以及实时业务的问题,于是有了现在的架构:
引入Kafka,并且和日志收集服务器部署在北京同机房;
每台日志收集服务器上的Flume Agent,通过内网将数据发送至Kafka;
Kafka的第一个消费者,北京网关机上的Flume,负责从Kafka中消费数据,然后流到北京Hadoop集群;
Kafka的第二个消费者,西安网关机上的Flume,负责从Kafka中消费数据,然后流到西安Hadoop集群;这里是西安的Flume通过外网连接北京Kafka,主动拉取数据,如果网络不稳定,那么当前批次拉取失败,最多重新拉一次,数据不会进Flume channel,更不会流到HDFS上,因此,这种方式在网络不稳定的情况下,不会造成数据缺失或重复;
Kafka的第三个消费者,北京网关机上的实时计算模块,后面再说;
Kafka的第N个消费者,其他;
这种架构下,Kafka成为了统一的日志数据提供者,至关重要。我们目前有4台Broker节点,每个Topic在创建时候都指定了4个分区,副本数为2;
数据在进入Kafka分区的时候,使用了Flume的拦截器,从日志中提取用户ID,然后通过HASH取模,将数据流到Kafka相应的分区中。这种方式,一方面,完成了简单的负载均衡,另一方面,确保相同的用户数据都处于同一个分区中,为后面实时计算模块的统计提供了极大的便利。
在整个流程中,有两个地方用到了同一个Flume拦截器(Regex Extractor Interceptor),就是在Flume Source中从消息中提取数据,并加入到Header,供Sink使用;
一处是在LogServer上部署的Flume Source,它从原始日志中提取出用户ID,然后加入到Header中,Flume Sink(Kafka Sink)再入Kafka之前,从Header中拿出该用户ID,然后通过应用分区规则,将该条消息写入Kafka对应的分区中;
另外一处是部署在西安的Flume Source,它从Kafka中读取消息之后,从消息中抽取出时间字段,并加入到Header中,后面的Flume Sink(HDFS Sink)通过读取Header中时间,根据消息中的时间,将数据写入HDFS相应的目录和文件中。如果在HDFS Sink中仅仅使用当前时间来确定HDFS目录和文件名称,这样会造成一小部分数据没有写入到正确的目录和文件中,比如:日志中8点59分59秒的数据可能会被写进HDFS上9点的目录和文件中,因为原始数据经过Kafka,通过外网传输到西安的Flume,有个几秒的延时,那是很正常的。
在北京部署的Flume,使用Kafka Source从Kafka中读取数据流向北京Hadoop集群,西安的也一样,在消费同一Topic的消息时候,我们都是在两台机器上启动了两个Flume Agent,并且设置的统一消费组(group.id),根据Kafka相同的Topic,一条消息只能被同一消费组内的一个消费者消费,因此,Kafka中的一条消息,只会被这两个Flume Agent其中的一个消费掉,如果一个Flume Agent挂掉,那么另外一个将会消费所有消息;
这种方式,也是在流向HDFS的消费者端做了负载均衡和容错。
目前我们实时计算的业务比较简单,就是类似于根据不同维度统计PV和UV。比如:实时统计一个网站当天的累计PV、UV、IP数等,目前我们直接开发的JAVA程序,使用streamlib统计这些指标,UV和IP数这种需要去重的指标有2%以内的误差,业务可以接受。
实时计算模块使用Kafka low-level API,针对每一个Topic,都使用和分区数相等的线程去处理,每个线程消费一个分区的数据,由于数据在进入Kafka分区的时候,都是经过相应规则的分区,因此相同用户的数据会在同一个分区中;
另外,每个线程会在Redis中维护自己当前的Offsets,比如:在实时计算当天累计指标的业务场景中,每天0天在Redis中记录当前的Offsets,这样,如果实时计算程序挂掉,下次启动时候,从Redis中读取当天的Offsets,重新读取和计算当天的所有消息。
由于我们的需求是实时统计当天累计的指标,而且能接受一定的误差,因此采用这种方式。如果需要精确统计累计去重指标,那么可能需要采用其它方式,比如:精确统计当天实时累计用户数,一种简单的办法是在HBase中使用计数器来配合完成。
如果需要实时统计一小段时间(比如十分钟、一小时)之内的PV、UV等指标,那么可以使用SparkStreaming来完成,比较简单。如果单独使用Spark Streaming来完成一天内海量数据的累计去重统计,我还不太清楚有什么好的解决办法。
另外,实时OLAP也可能作为Kafka的实时消费者应用,比如:Druid。
后来,根据业务需要,我们有了两个Hadoop集群,并且部署在不同的地方(北京和西安),而所有的日志收集服务器在北京,因此需要将日志数据通过外网传输到西安,于是有了这样的部署:
很快,通过Flume流到西安Hadoop集群的数据就遇到了问题,比原始数据多或者少一些,造成这个问题的主要原因是在网络不稳定的情况下,北京Flume Agent发送到西安Flume Collector的过程中,会发送失败,或者响应失败。另外,之前的数据准实时也不能满足业务的需求。
为了解决数据实时跨外网传输以及实时业务的问题,于是有了现在的架构:
引入Kafka,并且和日志收集服务器部署在北京同机房;
每台日志收集服务器上的Flume Agent,通过内网将数据发送至Kafka;
Kafka的第一个消费者,北京网关机上的Flume,负责从Kafka中消费数据,然后流到北京Hadoop集群;
Kafka的第二个消费者,西安网关机上的Flume,负责从Kafka中消费数据,然后流到西安Hadoop集群;这里是西安的Flume通过外网连接北京Kafka,主动拉取数据,如果网络不稳定,那么当前批次拉取失败,最多重新拉一次,数据不会进Flume channel,更不会流到HDFS上,因此,这种方式在网络不稳定的情况下,不会造成数据缺失或重复;
Kafka的第三个消费者,北京网关机上的实时计算模块,后面再说;
Kafka的第N个消费者,其他;
Kafka中的数据分区及副本
这种架构下,Kafka成为了统一的日志数据提供者,至关重要。我们目前有4台Broker节点,每个Topic在创建时候都指定了4个分区,副本数为2;数据在进入Kafka分区的时候,使用了Flume的拦截器,从日志中提取用户ID,然后通过HASH取模,将数据流到Kafka相应的分区中。这种方式,一方面,完成了简单的负载均衡,另一方面,确保相同的用户数据都处于同一个分区中,为后面实时计算模块的统计提供了极大的便利。
Flume拦截器的使用
在整个流程中,有两个地方用到了同一个Flume拦截器(Regex Extractor Interceptor),就是在Flume Source中从消息中提取数据,并加入到Header,供Sink使用;一处是在LogServer上部署的Flume Source,它从原始日志中提取出用户ID,然后加入到Header中,Flume Sink(Kafka Sink)再入Kafka之前,从Header中拿出该用户ID,然后通过应用分区规则,将该条消息写入Kafka对应的分区中;
另外一处是部署在西安的Flume Source,它从Kafka中读取消息之后,从消息中抽取出时间字段,并加入到Header中,后面的Flume Sink(HDFS Sink)通过读取Header中时间,根据消息中的时间,将数据写入HDFS相应的目录和文件中。如果在HDFS Sink中仅仅使用当前时间来确定HDFS目录和文件名称,这样会造成一小部分数据没有写入到正确的目录和文件中,比如:日志中8点59分59秒的数据可能会被写进HDFS上9点的目录和文件中,因为原始数据经过Kafka,通过外网传输到西安的Flume,有个几秒的延时,那是很正常的。
Flume消费者的负载均衡和容错
在北京部署的Flume,使用Kafka Source从Kafka中读取数据流向北京Hadoop集群,西安的也一样,在消费同一Topic的消息时候,我们都是在两台机器上启动了两个Flume Agent,并且设置的统一消费组(group.id),根据Kafka相同的Topic,一条消息只能被同一消费组内的一个消费者消费,因此,Kafka中的一条消息,只会被这两个Flume Agent其中的一个消费掉,如果一个Flume Agent挂掉,那么另外一个将会消费所有消息;这种方式,也是在流向HDFS的消费者端做了负载均衡和容错。
实时计算模块
目前我们实时计算的业务比较简单,就是类似于根据不同维度统计PV和UV。比如:实时统计一个网站当天的累计PV、UV、IP数等,目前我们直接开发的JAVA程序,使用streamlib统计这些指标,UV和IP数这种需要去重的指标有2%以内的误差,业务可以接受。实时计算模块使用Kafka low-level API,针对每一个Topic,都使用和分区数相等的线程去处理,每个线程消费一个分区的数据,由于数据在进入Kafka分区的时候,都是经过相应规则的分区,因此相同用户的数据会在同一个分区中;
另外,每个线程会在Redis中维护自己当前的Offsets,比如:在实时计算当天累计指标的业务场景中,每天0天在Redis中记录当前的Offsets,这样,如果实时计算程序挂掉,下次启动时候,从Redis中读取当天的Offsets,重新读取和计算当天的所有消息。
由于我们的需求是实时统计当天累计的指标,而且能接受一定的误差,因此采用这种方式。如果需要精确统计累计去重指标,那么可能需要采用其它方式,比如:精确统计当天实时累计用户数,一种简单的办法是在HBase中使用计数器来配合完成。
其它实时数据消费者
如果需要实时统计一小段时间(比如十分钟、一小时)之内的PV、UV等指标,那么可以使用SparkStreaming来完成,比较简单。如果单独使用Spark Streaming来完成一天内海量数据的累计去重统计,我还不太清楚有什么好的解决办法。另外,实时OLAP也可能作为Kafka的实时消费者应用,比如:Druid。
相关文章推荐
- 记录一下互联网日志实时收集和实时计算的简单方案
- 互联网日志实时收集和实时计算的简单方案
- 日志实时收集和计算的简单方案
- 简单ELK收集日志方案
- flume-kafka- spark streaming日志收集实时计算遇到以下问题,求解决
- 互联网大数据日志收集离线实时分析实战案例
- flume-kafka- spark streaming(pyspark) - redis 实时日志收集实时计算
- flume-kafka- spark streaming(pyspark) - redis 实时日志收集实时计算
- flume-kafka- spark streaming(pyspark) - redis 实时日志收集实时计算 + Spark 基于pyspark下的实时日志分析
- 极其简单的PIM---TeenyPIM 0.1(专为记录工作日志及常被打断的工作设计)
- Linux系统日志记录到远程服务器简单配置
- [工会网站建设方案-日志]第一阶段 策划:收集、整理、分析
- SUSE下telnet的简单配法,转贴 -- 记录一下,以便备份
- 简单说一下我对这个CRM系统的设计方案
- Linux系统日志记录到远程服务器简单配置
- 简单记录一下Linux gadget serial的工作
- 简单计算一下,发现炒房一点不划算
- 公布一个简单的日志记录方法 【转】-要研究
- VeryCodes.Log让日志记录和读取变的更简单
- 贴一下Log4cplus每天记录一个日志文件的配置。