使用Flume采集流式数据发送到Kafka,再由Flink消费Kafka数据,实现电话号码统计
2020-06-05 06:20
429 查看
def main(args: Array[String]): Unit = {
//创建flink执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //设置并行度 env.setParallelism(1) //设置时间语义 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val properties = new Properties() properties.setProperty("bootstrap.servers", "hdp111:9092,hdp222:9092,hdp333:9092") properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") properties.setProperty("group.id", "test") val dataStream = env.addSource(new FlinkKafkaConsumer[String]("mykafka", new SimpleStringSchema(), properties)) //val dataStream = env.readTextFile("C:\\Intel\\day0411\\input\\ww.txt") .map(data=>{ val strings = data.split("\t") LongEv(strings(0).toLong,strings(1).toLong) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LongEv](Time.seconds(1)) { override def extractTimestamp(element: LongEv): Long = element.elTime*1000 })
// 6. 统计出5s内电话号码出现的总次数并打印到控制台
// dataStream.map(x=>(“电话号码总数”,1)).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print(“五秒内出现的总次数”)
// 7. 统计出5s内电话号码是偶数的电话并打印到控制台
// dataStream.filter(.phone % 2 == 0).map(x=>(x.phone, 1)).timeWindowAll(Time.seconds(5)).sum(1).print(“五秒内出现的偶数车牌”)
// 8. 统计出10s内同一电话号码出现次数超过2次及以上的电话号码并打印到控制台
// dataStream.map(x=>(x.phone,1)).keyBy(.1).timeWindow(Time.seconds(10)).sum(1).filter(._2>=2).print(“手机号出现次数超过2的”)
env.execute()
}
相关文章推荐
- 使用Flume采集流式数据发送到Kafka,再由Flink消费Kafka数据,实现电话号码统计本地文件读取
- 数据采集之解析Mysql的binlog日志发送至Kafka实时消费
- 数据采集之解析Mysql的binlog日志发送至Kafka实时消费
- Kafka 使用Java实现数据的生产和消费demo
- 使用Flume消费Kafka数据到HDFS
- Flink1.8消费Kafka的Nginx数据统计——02
- kafka常用命令&&flume和kafka整合&&用spark消费kafka中的数据实现wordcount&&将处理好的数据存到redis中
- 2016年大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式
- Flume的使用一:从指定端口采集数据发送到工作台
- flume实现kafka到hdfs实时数据采集 - 有负载均衡策略
- Kafka 使用Java实现数据的生产和消费demo
- Flume简介与使用(三)——Kafka Sink消费数据之Kafka安装
- kafka连接flink流计算,实现flink消费kafka的数据
- DataTable用中使用Compute 实现简单的DataTable数据的统计
- 使用Python实现子区域数据分类统计
- 使用AIDL实现IPC通信之——实现服务端主动发送数据给客户端
- 使用Python实现子区域数据分类统计
- flume采集数据到kafka和hive
- 分析各种Android设备屏幕分辨率与适配 - 使用大量真实安卓设备采集真实数据统计
- FLume监控文件夹,将数据发送给Kafka以及HDFS的配置文件详解