您的位置:首页 > 其它

[Scala] Flink项目实时流量统计(二)

2020-01-13 15:58 1306 查看

传送区

[Scala] Flink项目实例系列(零)
[Scala] Flink项目实时热门商品统计(一)
[Scala] Flink项目实时流量统计(二)
[Scala] Flink项目恶意登录监控(三)
[Scala] Flink项目订单支付失效监控(四)
[Scala] Flink项目订单支付实时对账(五)
[Scala] Flink项目小彩蛋(六)
本项目的代码及文件见这这这,友情码是:3n9z。

项目需求

这里分六个子需求

  • 统计每分钟ip访问量,取出访问量最大的5个地址,每5秒更新一次
  • 统计每小时的访问量(PV)
  • 统计每小时网站独立访客数(UV)
  • 需求与上条一样,但假设数据量极大无法存入内存计算
  • 基于不同渠道(如不同应用市场)统计用户行为(如浏览、点击、购买等行为)数量
  • 各地区每小时点击广告数量,10分钟刷新一次;过滤掉频繁点击单一广告的用户加入黑名单,限制一天

思路

分别对应上述六个需求

  • 需求与“热门商品统计”类似,略
  • 直接使用map转流为(data, 1),然后进行分窗累加即可
  • 基于PV的去重,使用Set
  • 通过布隆过滤器,结合Redis的setbit操作
  • 包含渠道统计:将渠道与行为组合成元组,类似联合主键进行分窗数据统计;不包含渠道统计:直接聚合,与“热门商品统计”类似且无需排序
  • 统计数量需求与之前需求类似;黑名单部分需要使用侧输出流统计,限制时间使用计时器实现

代码实现

热门页面统计

数据源结构

ip userId eventTime method url
50.16.19.13 - 17/05/2015:10:05:10 GET /blog/tags/puppet?flav=rss20
[code]import java.text.SimpleDateFormat

import com.sun.jmx.snmp.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListStateDescriptor, ListState}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

/*
需求:每隔5秒输出最近10分钟内访问量最多的前N个URL;与HotItems需求类似
*/
case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)
case class UrlViewCount(url: String, windowEnd: Long, count: Long)

object NetworkFlow {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//    env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val source = getClass.getResource("/apache.log")
val dataStream = env.readTextFile(source.getPath)
.map(line => {
val lineArray = line.split(" ")
val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val timestamp = simpleDateFormat.parse(lineArray(3)).getTime
ApacheLogEvent(lineArray(0), lineArray(2), timestamp, lineArray(5), lineArray(6))
})
// Time.seconds是设定延迟时间,要具体看数据本身的延迟程度;超出的可以通过窗口处理,再不行要通过SideOutput
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(1)) {
override def extractTimestamp(t: ApacheLogEvent): Long = t.eventTime // 默认全以秒处理,这里源数据就是秒
}) // 乱序需要定义WM
.filter(data => {
val pattern = "^((?!\\.(css|js|png|ico)$).)*$".r // 正则表达式,这里推荐个网站方便写正则https://regex101.com
(pattern findFirstIn data.url).nonEmpty
})
.keyBy(_.url)
.timeWindow(Time.minutes(10), Time.seconds(5))
.aggregate(new CountAgg(), new WindowResult())

val processedStream = dataStream
.keyBy(_.windowEnd)
.process(new TopNHotUrls(5))

//    dataStream.print("aggregate")
processedStream.print("process")

env.execute("network flow job")
}
}

class CountAgg() extends AggregateFunction[ApacheLogEvent, Long, Long] {
override def createAccumulator(): Long = 0L

override def merge(acc: Long, acc1: Long): Long = acc + acc1

override def getResult(acc: Long): Long = acc

override def add(in: ApacheLogEvent, acc: Long): Long = acc + 1
}

class WindowResult() extends WindowFunction[Long, UrlViewCount, String, TimeWindow] {
override def apply(url: String, window: TimeWindow, input: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
val count = input.iterator.next
out.collect(UrlViewCount(url, window.getEnd, count))
}
}

class TopNHotUrls(topNum: Int) extends KeyedProcessFunction[Long, UrlViewCount, String] {
private var urlState: ListState[UrlViewCount] = _

override def open(parameters: Configuration): Unit = {
super.open(parameters) // 这句好像没用,我没整明白
val urlStateDesc = new ListStateDescriptor[UrlViewCount]("urlState-state", classOf[UrlViewCount])
urlState = getRuntimeContext.getListState(urlStateDesc)
}

override def processElement(value: UrlViewCount, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#Context,
out: Collector[String]): Unit = {
urlState.add(value)
ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
}

override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext,
out: Collector[String]): Unit = {
val allUrlViews: ListBuffer[UrlViewCount] = ListBuffer()
import scala.collection.JavaConversions._
for (urlView <- urlState.get) { // 另一种读取方法是urlState.get().iterator(),判断它是否有hasNext,然后通过next获取
allUrlViews += urlView
}
urlState.clear()
// val sortedUrlViews = allUrlViews.sortBy(_.count)(Ordering.Long.reverse).take(topNum)
// 另一种排序算子
val sortedUrlViews = allUrlViews.sortWith(_.count > _.count).take(topNum)

var result: StringBuilder = new StringBuilder
result.append("====================================\n")
result.append("时间:").append(new Timestamp(timestamp - 1)).append("\n")

for (i <- sortedUrlViews.indices) {
val currentUrlView: UrlViewCount = sortedUrlViews(i)
result.append("No").append(i + 1).append(":").append(" URL=").append(currentUrlView.url)
.append(" 流量=").append(currentUrlView.count).append("\n")
}
result.append("====================================\n\n")
// 控制输出频率,模拟实时滚动结果
Thread.sleep(1000)
out.collect(result.toString)
}
}

PV

数据源结构

userId itemId categoryId behavior timestamp
543462 1715 1464116 pv 1511658000
[code]import org.apache.flink.api.common.state.{ListStateDescriptor, ListState}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

// 定义输入数据的样例类
case class UserBehavior( userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long )
// 定义窗口聚合结果样例类
case class ItemViewCount( itemId: Long, windowEnd: Long, count: Long )

object PageView {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val source = getClass.getResource("/UserBehavior.csv")
val dataStream = env.readTextFile(source.getPath)
.map(line => {
val lineArray = line.split(",")
UserBehavior(lineArray(0).trim.toLong, lineArray(1).trim.toLong,
lineArray(2).trim.toInt, lineArray(3).trim, lineArray(4).trim.toLong )
})
.assignAscendingTimestamps(_.timestamp * 1000)

// 下列过程较简单,请自行理解
val processedStream = dataStream
.filter(_.behavior == "pv")
.map(x => ("pv", 1))
.keyBy(_._1)
.timeWindow(Time.hours(1))
.sum(1)
.print()

env.execute("PV Job")
}
}

UV

[code]import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

case class UvCount(windowEnd: Long, count: Long)

object UniqueVisitor {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val source = getClass.getResource("/UserBehavior.csv")
val dataStream = env.readTextFile(source.getPath)
.map(line => {
val lineArray = line.split(",")
UserBehavior(lineArray(0).trim.toLong, lineArray(1).trim.toLong,
lineArray(2).trim.toInt, lineArray(3).trim, lineArray(4).trim.toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000)
.filter(_.behavior == "pv")
.timeWindowAll(Time.hours(1)) // 因为没有使用keyBy,所以全窗函数
.apply(new UvCountByWindow()) // 通过一个AllWindowFunction直接拿到结果
.print()

env.execute("Unique Visitor Job")

}
}

class UvCountByWindow() extends AllWindowFunction[UserBehavior, UvCount, TimeWindow] {
override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {
// 定义一个变长Set,去重
val s: collection.mutable.Set[Long] = collection.mutable.Set()
var idSet = Set[Long]()

// 遍历数据放入Set
for (userBehavior <- input) {
idSet += userBehavior.userId
}

// 输出构造的结果
out.collect(UvCount(window.getEnd, idSet.size))
}
}

布隆过滤器 + Redis的使用实现UV

[code]import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{TriggerResult, Trigger}
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis

object UvWithBloomFilter {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val source = getClass.getResource("/UserBehavior.csv")
val stream = env.readTextFile(source.getPath)
.map(data => {
val dataArray = data.split(",")
UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2)
.trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000)
.filter(_.behavior == "pv")
.map(data => ("dummyKey", data.userId)) // 传入userId用于在process中去重判断
.keyBy(_._1)
.timeWindow(Time.hours(1))
.trigger(new MyTrigger()) // 因为放入Redis就是考虑到数据量太大无法在窗口中存下,所以来一些便处理
.process(new UvCountWithBloom())

stream.print()
env.execute("Uv With Bloom Job")
}
}

class MyTrigger() extends Trigger[(String, Long), TimeWindow] {
// 每个数据到来后的操作
override def onElement(element: (String, Long), timestamp: Long,
window: TimeWindow, ctx: TriggerContext): TriggerResult = {
// 每来一条数据,就直接触发窗口操作,并清空所有窗口状态,可以优先成来一些处理而非单条
TriggerResult.FIRE_AND_PURGE
}

// 关闭
override def clear(window: TimeWindow, ctx: TriggerContext): Unit = {}

// 基于ProcessingTime的操作
override def onProcessingTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}

// 基于EventTime的操作
override def onEventTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
}

// 定义布隆过滤器
class Bloom(size: Long) extends Serializable {
// 位图总大小,默认16m
private val cap = if (size > 0) size else 1 << 27

// 定义hash函数
def hash(value: String, seed: Int): Long = {
var result = 0L
for (i <- 0 until value.length) {
// seed通常用质数,让结果尽量散列,charAt是得到位,目的就是让result更加“不同”
result = result * seed + value.charAt(i)
}
// cap - 1之后为0111..11,与result & 操作后,你保留result的后size个数的位
result & (cap - 1)
}
}

class UvCountWithBloom() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow] {

// 定义Reids连接
lazy val jedis = new Jedis("localhost", 6379)
lazy val bloom = new Bloom(1 << 29)

override def process(key: String, context: Context, elements: Iterable[(String, Long)],
out: Collector[UvCount]): Unit = {
// 位图的存储方式,key是windowEnd,value是bitmap
val storeKey = context.window.getEnd.toString
var count = 0L
// 因为之前窗口会清空所以把count也放入redis
// 把每个窗口的uv count值也存入名为count的redis表,存放内容为(windowEnd -> uvCount),所以要先从redis中读取
if (jedis.hget("count", storeKey) != null) {
count = jedis.hget("count", storeKey).toLong
}
// 用布隆过滤器判断当前用户是否已经存在
// 因为之前trigger定义每次只放一条数据,所以elements内只会有一条才能用last,如果改成多条应该需要遍历
val userId = elements.last._2.toString
val offset = bloom.hash(userId, 61)
// 定义一个标识位,判断reids位图中有没有这一位
val isExist = jedis.getbit(storeKey, offset)
if (!isExist) {
// 如果不存在,位图对应位置1,count + 1
jedis.setbit(storeKey, offset, true)
jedis.hset("count", storeKey, (count + 1).toString)
out.collect(UvCount(storeKey.toLong, count + 1))
} else {
out.collect(UvCount(storeKey.toLong, count))
}
}
}

市场推广含渠道

[code]import java.util.UUID
import java.util.concurrent.TimeUnit

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.util.Random

// 仍是使用样例类对数据进行结构化处理
case class MarketingUserBehavior(userId: String, behavior: String, channel: String, timestamp: Long)
case class MarketingCountView(windowStart: Long, windowEnd: Long, channel: String, behavior: String, count: Long)

object AppMarketingByChannel {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val dataStream = env.addSource(new SimulatedEventSource())
.assignAscendingTimestamps(_.timestamp)
.filter(_.behavior != "UNINSTALL")
.map(data => {
((data.channel, data.behavior), 1L)
})
.keyBy(_._1)
.timeWindow(Time.hours(1), Time.seconds(10))
.process(new MarketingCountByChannel())
dataStream.print()

env.execute()
}
}

// 造数据不解释了
class SimulatedEventSource() extends RichSourceFunction[MarketingUserBehavior] {

// 定义是否运行的标识位
var running = true
val channelSet: Seq[String] = Seq("AppStore",  "XiaomiStore", "HuaweiStore", "weibo", "wechat", "tieba")
val behaviorTypes: Seq[String] = Seq("BROWSE", "CLICK", "PURCHASE", "UNINSTALL")
val rand: Random = Random

override def cancel(): Unit = {
running = false
}

override def run(ctx: SourceContext[MarketingUserBehavior]): Unit = {

// 下边这个值表示生产多少条数据
val maxElements = 10L
var count = 0L

while (running && count < maxElements) {
val id = UUID.randomUUID().toString
val behaviorType = behaviorTypes(rand.nextInt(behaviorTypes.size))
val channel = channelSet(rand.nextInt(channelSet.size))
val ts = System.currentTimeMillis()

ctx.collect(MarketingUserBehavior(id, behaviorType, channel, ts))

count += 1
TimeUnit.MILLISECONDS.sleep(5L)
}
}
}

// ProcessWindowFunction所需四个参数为IN OUT KEY WINDOW,注意自己分析所需要的数据类型、结构
class MarketingCountByChannel() extends ProcessWindowFunction[((String, String), Long),
MarketingCountView, (String, String), TimeWindow] {

override def process(key: (String, String), context: Context, elements: Iterable[((String, String), Long)],
out: Collector[MarketingCountView]): Unit = {
val startTs = context.window.getStart
val endTs = context.window.getEnd
val channel = key._1
val behaviorType = key._2
val count = elements.size
out.collect(MarketingCountView(startTs, endTs, channel, behaviorType, count))
}
}

市场推广不含渠道

[code]import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object AppMarketing {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val dataStream = env.addSource(new SimulatedEventSource())
.assignAscendingTimestamps(_.timestamp)
.filter(_.behavior != "UNINSTALL")
.map(data => {
("dummyKey", 1L)
})
.keyBy(_._1)
.timeWindow(Time.hours(1), Time.seconds(10))
.aggregate(new CountAgg(), new MarketingCountTotal())

dataStream.print()

env.execute()
}
}

// 以下操作用太多,不解释了
class CountAgg() extends AggregateFunction[(String, Long), Long, Long]{
override def add(value: (String, Long), accumulator: Long): Long = accumulator + 1

override def createAccumulator(): Long = 0L

override def getResult(accumulator: Long): Long = accumulator

override def merge(a: Long, b: Long): Long = a + b
}

class MarketingCountTotal() extends WindowFunction[Long, MarketingCountView, String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[Long],
out: Collector[MarketingCountView]): Unit = {
val startTs = window.getStart
val endTs = window.getEnd
val count = input.iterator.next()
out.collect(MarketingCountView(startTs, endTs, "app marketing", "total", count))
}
}

页面广告统计

数据源结构

userId adId province city timestamp
543462 1715 beijing beijing 1511658000
[code]import java.sql.Timestamp

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

case class AdClickEvent(userId: Long, adId: Long, province: String, city: String, timestamp: Long)

case class CountByProvice(windowEnd: String, province: String, count: Long)

// 黑名单报警信息
case class BlackListWarning(userId: Long, adId: Long, msg: String)

object AdStatistics {

// process部分主输出用于下边的统计,黑名单部分用侧输出流获取
val blackListOutputTag: OutputTag[BlackListWarning] = new OutputTag[BlackListWarning]("blackList")

def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val resource = getClass.getResource("/AdClickLog.csv")
val addEventStream = env.readTextFile(resource.getPath)
.map(data => {
val dataArray = data.split(",")
AdClickEvent(dataArray(0).trim.toLong, dataArray(1).trim.toLong,
dataArray(2).trim, dataArray(3).trim, dataArray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000)

// 自定义process function过滤刷单行为,过滤掉的数据加入侧输出流
val filterBlackListStream = addEventStream
.keyBy(data => (data.userId, data.adId))
.process(new FilterBlackListUser(30))

val adCountStream = filterBlackListStream
.keyBy(_.province)
.timeWindow(Time.hours(1), Time.minutes(10))
.aggregate(new AdCountAgg(), new AdCountResult())

adCountStream.print()
// 哪个算子输出的侧流就用哪个getSideOutput
filterBlackListStream.getSideOutput(blackListOutputTag).print("blackList")

env.execute("AdStatistics Job")
}

class FilterBlackListUser(maxCount: Int) extends KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent] {

// 定义状态,保存当前用户对当前广告的点击量
lazy val countState: ValueState[Long] = getRuntimeContext
.getState(new ValueStateDescriptor[Long]("countState", classOf[Long]))
// 保存是否发送过黑名单的状态
lazy val isSentBlackList: ValueState[Boolean] = getRuntimeContext
.getState(new ValueStateDescriptor[Boolean]("isSent-state", classOf[Boolean]))
// 保存定时器触发的时间戳
lazy val resetTimer: ValueState[Long] = getRuntimeContext
.getState(new ValueStateDescriptor[Long]("resetTime-state", classOf[Long]))

override def processElement(value: AdClickEvent, ctx: KeyedProcessFunction[(Long, Long),
AdClickEvent, AdClickEvent]#Context, out: Collector[AdClickEvent]): Unit = {
// 取出count状态
val curCount = countState.value()

// 如果是第一次处理,注册定时器,每天00:00触发
if (curCount == 0) {
val ts = (ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1) * (1000 * 60 * 60 * 24)
resetTimer.update(ts)
ctx.timerService().registerProcessingTimeTimer(ts)
}

// 判断计数是否达到上限,如果到达则加入黑名单
if (curCount >= maxCount) {
// 判断是否发送过黑名单,只发送一次
if (!isSentBlackList.value()) {
isSentBlackList.update(true)
// 输出到侧输出流
ctx.output(blackListOutputTag, BlackListWarning(value.userId, value.adId,
"Click over " + maxCount + " times today."))
}
return // 如果判断已经超过限制数,则不应输出数据到主流
}
// 计数状态加1,输出数据到主流
countState.update(curCount + 1)
out.collect(value)
}

override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent,
AdClickEvent]#OnTimerContext, out: Collector[AdClickEvent]): Unit = {
// 定时器触发时,清空状态
if (timestamp == resetTimer.value()) {
isSentBlackList.clear()
countState.clear()
resetTimer.clear()
}
}
}

class AdCountAgg() extends AggregateFunction[AdClickEvent, Long, Long] {
override def createAccumulator(): Long = 0L

override def merge(acc: Long, acc1: Long): Long = acc + acc1

override def getResult(acc: Long): Long = acc

override def add(in: AdClickEvent, acc: Long): Long = acc + 1
}

class AdCountResult() extends WindowFunction[Long, CountByProvice, String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[Long],
out: Collector[CountByProvice]): Unit = {
out.collect(CountByProvice(new Timestamp(window.getEnd).toString, key, input.iterator.next))
}
}
}
  • 点赞
  • 收藏
  • 分享
  • 文章举报
娄夏 发布了19 篇原创文章 · 获赞 0 · 访问量 363 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: