[Scala] Flink项目实时流量统计(二)
2020-01-13 15:58
1306 查看
传送区
[Scala] Flink项目实例系列(零)
[Scala] Flink项目实时热门商品统计(一)
[Scala] Flink项目实时流量统计(二)
[Scala] Flink项目恶意登录监控(三)
[Scala] Flink项目订单支付失效监控(四)
[Scala] Flink项目订单支付实时对账(五)
[Scala] Flink项目小彩蛋(六)
本项目的代码及文件见这这这,友情码是:3n9z。
项目需求
这里分六个子需求
- 统计每分钟ip访问量,取出访问量最大的5个地址,每5秒更新一次
- 统计每小时的访问量(PV)
- 统计每小时网站独立访客数(UV)
- 需求与上条一样,但假设数据量极大无法存入内存计算
- 基于不同渠道(如不同应用市场)统计用户行为(如浏览、点击、购买等行为)数量
- 各地区每小时点击广告数量,10分钟刷新一次;过滤掉频繁点击单一广告的用户加入黑名单,限制一天
思路
分别对应上述六个需求
- 需求与“热门商品统计”类似,略
- 直接使用map转流为(data, 1),然后进行分窗累加即可
- 基于PV的去重,使用Set
- 通过布隆过滤器,结合Redis的setbit操作
- 包含渠道统计:将渠道与行为组合成元组,类似联合主键进行分窗数据统计;不包含渠道统计:直接聚合,与“热门商品统计”类似且无需排序
- 统计数量需求与之前需求类似;黑名单部分需要使用侧输出流统计,限制时间使用计时器实现
代码实现
热门页面统计
数据源结构
ip | userId | eventTime | method | url |
---|---|---|---|---|
50.16.19.13 | - | 17/05/2015:10:05:10 | GET | /blog/tags/puppet?flav=rss20 |
[code]import java.text.SimpleDateFormat import com.sun.jmx.snmp.Timestamp import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ListStateDescriptor, ListState} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import scala.collection.mutable.ListBuffer /* 需求:每隔5秒输出最近10分钟内访问量最多的前N个URL;与HotItems需求类似 */ case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String) case class UrlViewCount(url: String, windowEnd: Long, count: Long) object NetworkFlow { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment // env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val source = getClass.getResource("/apache.log") val dataStream = env.readTextFile(source.getPath) .map(line => { val lineArray = line.split(" ") val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss") val timestamp = simpleDateFormat.parse(lineArray(3)).getTime ApacheLogEvent(lineArray(0), lineArray(2), timestamp, lineArray(5), lineArray(6)) }) // Time.seconds是设定延迟时间,要具体看数据本身的延迟程度;超出的可以通过窗口处理,再不行要通过SideOutput .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(1)) { override def extractTimestamp(t: ApacheLogEvent): Long = t.eventTime // 默认全以秒处理,这里源数据就是秒 }) // 乱序需要定义WM .filter(data => { val pattern = "^((?!\\.(css|js|png|ico)$).)*$".r // 正则表达式,这里推荐个网站方便写正则https://regex101.com (pattern findFirstIn data.url).nonEmpty }) .keyBy(_.url) .timeWindow(Time.minutes(10), Time.seconds(5)) .aggregate(new CountAgg(), new WindowResult()) val processedStream = dataStream .keyBy(_.windowEnd) .process(new TopNHotUrls(5)) // dataStream.print("aggregate") processedStream.print("process") env.execute("network flow job") } } class CountAgg() extends AggregateFunction[ApacheLogEvent, Long, Long] { override def createAccumulator(): Long = 0L override def merge(acc: Long, acc1: Long): Long = acc + acc1 override def getResult(acc: Long): Long = acc override def add(in: ApacheLogEvent, acc: Long): Long = acc + 1 } class WindowResult() extends WindowFunction[Long, UrlViewCount, String, TimeWindow] { override def apply(url: String, window: TimeWindow, input: Iterable[Long], out: Collector[UrlViewCount]): Unit = { val count = input.iterator.next out.collect(UrlViewCount(url, window.getEnd, count)) } } class TopNHotUrls(topNum: Int) extends KeyedProcessFunction[Long, UrlViewCount, String] { private var urlState: ListState[UrlViewCount] = _ override def open(parameters: Configuration): Unit = { super.open(parameters) // 这句好像没用,我没整明白 val urlStateDesc = new ListStateDescriptor[UrlViewCount]("urlState-state", classOf[UrlViewCount]) urlState = getRuntimeContext.getListState(urlStateDesc) } override def processElement(value: UrlViewCount, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#Context, out: Collector[String]): Unit = { urlState.add(value) ctx.timerService().registerEventTimeTimer(value.windowEnd + 1) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = { val allUrlViews: ListBuffer[UrlViewCount] = ListBuffer() import scala.collection.JavaConversions._ for (urlView <- urlState.get) { // 另一种读取方法是urlState.get().iterator(),判断它是否有hasNext,然后通过next获取 allUrlViews += urlView } urlState.clear() // val sortedUrlViews = allUrlViews.sortBy(_.count)(Ordering.Long.reverse).take(topNum) // 另一种排序算子 val sortedUrlViews = allUrlViews.sortWith(_.count > _.count).take(topNum) var result: StringBuilder = new StringBuilder result.append("====================================\n") result.append("时间:").append(new Timestamp(timestamp - 1)).append("\n") for (i <- sortedUrlViews.indices) { val currentUrlView: UrlViewCount = sortedUrlViews(i) result.append("No").append(i + 1).append(":").append(" URL=").append(currentUrlView.url) .append(" 流量=").append(currentUrlView.count).append("\n") } result.append("====================================\n\n") // 控制输出频率,模拟实时滚动结果 Thread.sleep(1000) out.collect(result.toString) } }
PV
数据源结构
userId | itemId | categoryId | behavior | timestamp |
---|---|---|---|---|
543462 | 1715 | 1464116 | pv | 1511658000 |
[code]import org.apache.flink.api.common.state.{ListStateDescriptor, ListState} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time // 定义输入数据的样例类 case class UserBehavior( userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long ) // 定义窗口聚合结果样例类 case class ItemViewCount( itemId: Long, windowEnd: Long, count: Long ) object PageView { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val source = getClass.getResource("/UserBehavior.csv") val dataStream = env.readTextFile(source.getPath) .map(line => { val lineArray = line.split(",") UserBehavior(lineArray(0).trim.toLong, lineArray(1).trim.toLong, lineArray(2).trim.toInt, lineArray(3).trim, lineArray(4).trim.toLong ) }) .assignAscendingTimestamps(_.timestamp * 1000) // 下列过程较简单,请自行理解 val processedStream = dataStream .filter(_.behavior == "pv") .map(x => ("pv", 1)) .keyBy(_._1) .timeWindow(Time.hours(1)) .sum(1) .print() env.execute("PV Job") } }
UV
[code]import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.AllWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector case class UvCount(windowEnd: Long, count: Long) object UniqueVisitor { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val source = getClass.getResource("/UserBehavior.csv") val dataStream = env.readTextFile(source.getPath) .map(line => { val lineArray = line.split(",") UserBehavior(lineArray(0).trim.toLong, lineArray(1).trim.toLong, lineArray(2).trim.toInt, lineArray(3).trim, lineArray(4).trim.toLong) }) .assignAscendingTimestamps(_.timestamp * 1000) .filter(_.behavior == "pv") .timeWindowAll(Time.hours(1)) // 因为没有使用keyBy,所以全窗函数 .apply(new UvCountByWindow()) // 通过一个AllWindowFunction直接拿到结果 .print() env.execute("Unique Visitor Job") } } class UvCountByWindow() extends AllWindowFunction[UserBehavior, UvCount, TimeWindow] { override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = { // 定义一个变长Set,去重 val s: collection.mutable.Set[Long] = collection.mutable.Set() var idSet = Set[Long]() // 遍历数据放入Set for (userBehavior <- input) { idSet += userBehavior.userId } // 输出构造的结果 out.collect(UvCount(window.getEnd, idSet.size)) } }
布隆过滤器 + Redis的使用实现UV
[code]import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.{TriggerResult, Trigger} import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import redis.clients.jedis.Jedis object UvWithBloomFilter { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val source = getClass.getResource("/UserBehavior.csv") val stream = env.readTextFile(source.getPath) .map(data => { val dataArray = data.split(",") UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2) .trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong) }) .assignAscendingTimestamps(_.timestamp * 1000) .filter(_.behavior == "pv") .map(data => ("dummyKey", data.userId)) // 传入userId用于在process中去重判断 .keyBy(_._1) .timeWindow(Time.hours(1)) .trigger(new MyTrigger()) // 因为放入Redis就是考虑到数据量太大无法在窗口中存下,所以来一些便处理 .process(new UvCountWithBloom()) stream.print() env.execute("Uv With Bloom Job") } } class MyTrigger() extends Trigger[(String, Long), TimeWindow] { // 每个数据到来后的操作 override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = { // 每来一条数据,就直接触发窗口操作,并清空所有窗口状态,可以优先成来一些处理而非单条 TriggerResult.FIRE_AND_PURGE } // 关闭 override def clear(window: TimeWindow, ctx: TriggerContext): Unit = {} // 基于ProcessingTime的操作 override def onProcessingTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = { TriggerResult.CONTINUE } // 基于EventTime的操作 override def onEventTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = { TriggerResult.CONTINUE } } // 定义布隆过滤器 class Bloom(size: Long) extends Serializable { // 位图总大小,默认16m private val cap = if (size > 0) size else 1 << 27 // 定义hash函数 def hash(value: String, seed: Int): Long = { var result = 0L for (i <- 0 until value.length) { // seed通常用质数,让结果尽量散列,charAt是得到位,目的就是让result更加“不同” result = result * seed + value.charAt(i) } // cap - 1之后为0111..11,与result & 操作后,你保留result的后size个数的位 result & (cap - 1) } } class UvCountWithBloom() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow] { // 定义Reids连接 lazy val jedis = new Jedis("localhost", 6379) lazy val bloom = new Bloom(1 << 29) override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = { // 位图的存储方式,key是windowEnd,value是bitmap val storeKey = context.window.getEnd.toString var count = 0L // 因为之前窗口会清空所以把count也放入redis // 把每个窗口的uv count值也存入名为count的redis表,存放内容为(windowEnd -> uvCount),所以要先从redis中读取 if (jedis.hget("count", storeKey) != null) { count = jedis.hget("count", storeKey).toLong } // 用布隆过滤器判断当前用户是否已经存在 // 因为之前trigger定义每次只放一条数据,所以elements内只会有一条才能用last,如果改成多条应该需要遍历 val userId = elements.last._2.toString val offset = bloom.hash(userId, 61) // 定义一个标识位,判断reids位图中有没有这一位 val isExist = jedis.getbit(storeKey, offset) if (!isExist) { // 如果不存在,位图对应位置1,count + 1 jedis.setbit(storeKey, offset, true) jedis.hset("count", storeKey, (count + 1).toString) out.collect(UvCount(storeKey.toLong, count + 1)) } else { out.collect(UvCount(storeKey.toLong, count)) } } }
市场推广含渠道
[code]import java.util.UUID import java.util.concurrent.TimeUnit import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.source.RichSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import scala.util.Random // 仍是使用样例类对数据进行结构化处理 case class MarketingUserBehavior(userId: String, behavior: String, channel: String, timestamp: Long) case class MarketingCountView(windowStart: Long, windowEnd: Long, channel: String, behavior: String, count: Long) object AppMarketingByChannel { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val dataStream = env.addSource(new SimulatedEventSource()) .assignAscendingTimestamps(_.timestamp) .filter(_.behavior != "UNINSTALL") .map(data => { ((data.channel, data.behavior), 1L) }) .keyBy(_._1) .timeWindow(Time.hours(1), Time.seconds(10)) .process(new MarketingCountByChannel()) dataStream.print() env.execute() } } // 造数据不解释了 class SimulatedEventSource() extends RichSourceFunction[MarketingUserBehavior] { // 定义是否运行的标识位 var running = true val channelSet: Seq[String] = Seq("AppStore", "XiaomiStore", "HuaweiStore", "weibo", "wechat", "tieba") val behaviorTypes: Seq[String] = Seq("BROWSE", "CLICK", "PURCHASE", "UNINSTALL") val rand: Random = Random override def cancel(): Unit = { running = false } override def run(ctx: SourceContext[MarketingUserBehavior]): Unit = { // 下边这个值表示生产多少条数据 val maxElements = 10L var count = 0L while (running && count < maxElements) { val id = UUID.randomUUID().toString val behaviorType = behaviorTypes(rand.nextInt(behaviorTypes.size)) val channel = channelSet(rand.nextInt(channelSet.size)) val ts = System.currentTimeMillis() ctx.collect(MarketingUserBehavior(id, behaviorType, channel, ts)) count += 1 TimeUnit.MILLISECONDS.sleep(5L) } } } // ProcessWindowFunction所需四个参数为IN OUT KEY WINDOW,注意自己分析所需要的数据类型、结构 class MarketingCountByChannel() extends ProcessWindowFunction[((String, String), Long), MarketingCountView, (String, String), TimeWindow] { override def process(key: (String, String), context: Context, elements: Iterable[((String, String), Long)], out: Collector[MarketingCountView]): Unit = { val startTs = context.window.getStart val endTs = context.window.getEnd val channel = key._1 val behaviorType = key._2 val count = elements.size out.collect(MarketingCountView(startTs, endTs, channel, behaviorType, count)) } }
市场推广不含渠道
[code]import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector object AppMarketing { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val dataStream = env.addSource(new SimulatedEventSource()) .assignAscendingTimestamps(_.timestamp) .filter(_.behavior != "UNINSTALL") .map(data => { ("dummyKey", 1L) }) .keyBy(_._1) .timeWindow(Time.hours(1), Time.seconds(10)) .aggregate(new CountAgg(), new MarketingCountTotal()) dataStream.print() env.execute() } } // 以下操作用太多,不解释了 class CountAgg() extends AggregateFunction[(String, Long), Long, Long]{ override def add(value: (String, Long), accumulator: Long): Long = accumulator + 1 override def createAccumulator(): Long = 0L override def getResult(accumulator: Long): Long = accumulator override def merge(a: Long, b: Long): Long = a + b } class MarketingCountTotal() extends WindowFunction[Long, MarketingCountView, String, TimeWindow] { override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[MarketingCountView]): Unit = { val startTs = window.getStart val endTs = window.getEnd val count = input.iterator.next() out.collect(MarketingCountView(startTs, endTs, "app marketing", "total", count)) } }
页面广告统计
数据源结构
userId | adId | province | city | timestamp |
---|---|---|---|---|
543462 | 1715 | beijing | beijing | 1511658000 |
[code]import java.sql.Timestamp import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector case class AdClickEvent(userId: Long, adId: Long, province: String, city: String, timestamp: Long) case class CountByProvice(windowEnd: String, province: String, count: Long) // 黑名单报警信息 case class BlackListWarning(userId: Long, adId: Long, msg: String) object AdStatistics { // process部分主输出用于下边的统计,黑名单部分用侧输出流获取 val blackListOutputTag: OutputTag[BlackListWarning] = new OutputTag[BlackListWarning]("blackList") def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val resource = getClass.getResource("/AdClickLog.csv") val addEventStream = env.readTextFile(resource.getPath) .map(data => { val dataArray = data.split(",") AdClickEvent(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim, dataArray(3).trim, dataArray(4).toLong) }) .assignAscendingTimestamps(_.timestamp * 1000) // 自定义process function过滤刷单行为,过滤掉的数据加入侧输出流 val filterBlackListStream = addEventStream .keyBy(data => (data.userId, data.adId)) .process(new FilterBlackListUser(30)) val adCountStream = filterBlackListStream .keyBy(_.province) .timeWindow(Time.hours(1), Time.minutes(10)) .aggregate(new AdCountAgg(), new AdCountResult()) adCountStream.print() // 哪个算子输出的侧流就用哪个getSideOutput filterBlackListStream.getSideOutput(blackListOutputTag).print("blackList") env.execute("AdStatistics Job") } class FilterBlackListUser(maxCount: Int) extends KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent] { // 定义状态,保存当前用户对当前广告的点击量 lazy val countState: ValueState[Long] = getRuntimeContext .getState(new ValueStateDescriptor[Long]("countState", classOf[Long])) // 保存是否发送过黑名单的状态 lazy val isSentBlackList: ValueState[Boolean] = getRuntimeContext .getState(new ValueStateDescriptor[Boolean]("isSent-state", classOf[Boolean])) // 保存定时器触发的时间戳 lazy val resetTimer: ValueState[Long] = getRuntimeContext .getState(new ValueStateDescriptor[Long]("resetTime-state", classOf[Long])) override def processElement(value: AdClickEvent, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#Context, out: Collector[AdClickEvent]): Unit = { // 取出count状态 val curCount = countState.value() // 如果是第一次处理,注册定时器,每天00:00触发 if (curCount == 0) { val ts = (ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1) * (1000 * 60 * 60 * 24) resetTimer.update(ts) ctx.timerService().registerProcessingTimeTimer(ts) } // 判断计数是否达到上限,如果到达则加入黑名单 if (curCount >= maxCount) { // 判断是否发送过黑名单,只发送一次 if (!isSentBlackList.value()) { isSentBlackList.update(true) // 输出到侧输出流 ctx.output(blackListOutputTag, BlackListWarning(value.userId, value.adId, "Click over " + maxCount + " times today.")) } return // 如果判断已经超过限制数,则不应输出数据到主流 } // 计数状态加1,输出数据到主流 countState.update(curCount + 1) out.collect(value) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#OnTimerContext, out: Collector[AdClickEvent]): Unit = { // 定时器触发时,清空状态 if (timestamp == resetTimer.value()) { isSentBlackList.clear() countState.clear() resetTimer.clear() } } } class AdCountAgg() extends AggregateFunction[AdClickEvent, Long, Long] { override def createAccumulator(): Long = 0L override def merge(acc: Long, acc1: Long): Long = acc + acc1 override def getResult(acc: Long): Long = acc override def add(in: AdClickEvent, acc: Long): Long = acc + 1 } class AdCountResult() extends WindowFunction[Long, CountByProvice, String, TimeWindow] { override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[CountByProvice]): Unit = { out.collect(CountByProvice(new Timestamp(window.getEnd).toString, key, input.iterator.next)) } } }
- 点赞
- 收藏
- 分享
- 文章举报
相关文章推荐
- 广告流量实时统计 scala版本 过滤黑名单 统计各省市实时广告用户点击量
- [Scala] Flink项目订单支付实时对账(五)
- Spark日志分析项目Demo(8)--SparkStream,广告点击流量实时统计
- Python开发【项目】:生产环境下实时统计网站访问日志信息
- Kafka项目实战-用户日志上报实时统计之应用概述
- MapReduce项目应用之 处理手机通信流量统计
- reduceByKeyAndWindow基于滑动窗口的热点搜索词实时统计(Scala版本)
- Kafka项目实战-用户日志上报实时统计之分析与设计
- spark全场景项目实战,用户行为实时分析,实时流量监控系统,实时电影推荐系统
- Spark企业级实战项目:道路交通实时流量监控预测系统
- Kafka项目实战-用户日志上报实时统计之编码实践
- 使用Project统计项目实时进度
- 一个强大的网络分析shell脚本分享(实时流量、连接统计)
- 实时计算(统计)APACHE每个虚拟主机的流量==主机服务商必备
- Android基于TrafficStats实现实时流量统计
- CentOS安装按进程实时统计流量情况工具NetHogs笔记
- spark数据统计项目(Scala版)
- Kafka项目实战-用户日志上报实时统计之分析与设计
- Flink实时营销分析--数据部分--项目背景(一)
- Kafka项目实战-用户日志上报实时统计之编码实践