您的位置:首页 > 其它

[翻译]Introduction to Monoids and Semigroups with Spark

2015-07-17 21:35 531 查看

在地球上什么是Monoid??

定义:

monoid(幺半群 译注:参考附注1翻译,下文中继续使用英文名)是一个带有二元运算(+)和一个单位元(译注:原文为identity element)i使得对于任意x,x+i=i+x=x。注意它不像群(译注:group,数学上翻译为群),它不带有逆元素。也可以说是带有单位元的半群(semigroup)

Wow,没什么用。那先看一些例子然后重新看个简单定义。。

https://blog.safaribooksonline.com/2013/05/15/monoids-for-programmers-a-scala-example/

1.整数和加法

结合律 => (a+b)+c == a+(b+c) 和 单位元=>0+n==n+0==n

1. 整数和乘法

结合律=>(a*b)c==a(b*c) 和单位元=>1*n==n*1==n

1.列表和关联

结合律=>List(1,2)+(List(3,4)+List(5,6))==(List(1,2)+List(3,4))+List(5,6)==List(1,2,3,4,5,6)和单位元=>List(1)+List()==List()+List(1)==List(1)

这看起来任意二元操作都是Monoid.我们能否举一些反例?

例如平均

avg(1,2)

avg(10,avg(20,30))!=avg(avg(10,20),30)

减法!

原来一个semigroup就是一个Monoid除了其不需要一个单位元,所以更具有包容性。

关键在于你想象二元操作结合性的要求,这意味着可以不在乎计算的顺序!也意味着很容易并发的进行计算。

一个Monoid本质上是特定类型所遵循的协议。这是否给予我们任何线索在Scala中实现Monoids/Semigroups?

Typeclass实现Monoids

//It is up to the developer to enforce the associativity rule!!!
trait SemiGroup[T]{
def op(a: T, b: T): T
}

trait Monoid[T] extends SemiGroup[T]{
def zero: T
}


现在我们熟悉了怎么在Scala中实现Monoid那么我们能够实现IntAdditionMonoid么

object Monoids{
implicit object IntAdditionMonoid extends Monoid[Int]{
def op(a: Int, b: Int): Int = a + b
def zeru: Int = 0
}
}


好样的。现在我们在哪可以使用呢?看如何在方法中使用例如reduce…

trait SemiGroup[T]{
def op(a: T, b: T): T
}

trait Monoid[T] extends SemiGroup[T]{
def zero: T
}
implicit object IntAdditionMonoid extends Monoid[Int]{
def op(a: Int, b: Int): Int = a + b
def zero: Int = 0
}

val listA = List(1,3,5,6)

def reduceWithMonoid[A](seq:Seq[A])(implicit ev:Monoid[A]): A = {
seq.reduce(ev.op(_,_))
}
println(reduceWithMonoid(listA))


再定义更多然后看他们如何表现

trait SemiGroup[T]{
def op(a: T, b: T): T
}

trait Monoid[T] extends SemiGroup[T]{
def zero: T
}
implicit object IntAdditionMonoid extends Monoid[Int]{
def op(a: Int, b: Int): Int = a + b
def zero: Int = 0
}

//We now have must use a class as type parameters are required due to the fact that tuples themselves have classes.
//Here our goal is to define functionality for tuples that contain monoid abiding types
class Tuple2SemiGroup[A,B]()(implicit sg1: SemiGroup[A], sg2: SemiGroup[B]) extends SemiGroup[(A,B)]{
def op(a: (A,B), b: (A,B)): (A,B) = (sg1.op(a._1, b._1), sg2.op(a._2, b._2))
}

//As we cannot make above an implicit class because that actually does something different (more on this with an aside about pimp my library pattern soon)
//Well we can use another feature of implicits which are implicit conversions. This function provides logic on how to change a Tuple that contains Semigroups and return a SemiGroup of the tuple itself
implicit def tuple2SemiGroup[A,B](implicit sg1: SemiGroup[A],sg2: SemiGroup[B]): SemiGroup[(A,B)] = {
new Tuple2SemiGroup[A,B]()(sg1,sg2)
}

val listA = List((1,2),(3,4),(5,2),(6,9))

def reduceWithMonoid[A](seq: Seq[A])(implicit ev: SemiGroup[A]): A = {
seq.reduce(ev.op(_,_))
}
println(reduceWithMonoid(listA))


···

看如何在monoid定义中包含聚集逻辑。事实上我们可以重定义集合对象的行为这意味着高重用和高扩展的代码。再看一个例子,然后我们转向spark.

在多看一个例子,Semigroups 可以很容易应用在合并2个maps关联键并且对值求和。

trait SemiGroup[T]{
def op(a: T, b: T): T
}

trait Monoid[T] extends SemiGroup[T]{
def zero: T
}
implicit object IntAdditionMonoid extends Monoid[Int]{
def op(a: Int, b: Int): Int = a + b
def zero: Int = 0
}

//Here we only need to assume that the values can form a SemiGroup as the keys are just being combined.
class MapSemiGroup[K,V]()(implicit sg1: SemiGroup[V]) extends SemiGroup[Map[K,V]]{
//We are aggregating where the initial map is one of the maps and we loop through key values of other one and combine.
//This way any keys that don't appear in the looping map are there already,all keys that appear in both are overwritten
def op(iteratingMap: Map[K,V], startingMap: Map[K,V]): Map[K,V] = iteratingMap.aggregate(startingMap)({
(currentMap: Map[K,V], kv: (K,V)) => {
val newValue: V = startingMap.get(kv._1).map(v => sg1.op(v, kv._2)).get
bab4
OrElse(kv._2)
currentMap + (kv._1 -> newValue)
}
},
//This is the combine part (if done in parallel, could have two different maps that need to be combined) this assumes that all keys are already combined....
{
(mapOne: Map[K,V], mapTwo: Map[K,V]) => mapOne ++ mapTwo
}
)
}

//As we cannot make above an implicit class because that actually does something diferent (more on this with an aside about pimp my library pattern soon)
//Well we can use another feature of implicits which are implicit conversions. This function provides logic on how to change a Tuple that contains Semigroups and return a SemiGroup of the tuple itself
implicit def mapSemiGroup[K,V](implicit sg1: SemiGroup[V]): SemiGroup[Map[K,V]] = {
new MapSemiGroup[K,V]()(sg1)
}

val mapA = Map("A" -> 1, "B" -> 2, "D" -> 5)
val mapB = Map("A" -> 3, "C" -> 3, "D" -> 1)
val mapC = Map("B" -> 10, "D" -> 3)

def reduceWithMonoid[A](seq: Seq[A])(implicit ev: SemiGroup[A]): A = {
seq.reduce(ev.op(_,_))
}
println(reduceWithMonoid(List(mapA, mapB, mapC)))


Pimp My Library(译注:参考3)示例和我们为什么不用implicit classes

//An implicit class takes a constructor which is the class to be pimpd. You can then define methods etc. which will be "available" on that type as though it was native functionality!!!!
implicit class PimpedString(s: String){
def pimpMyString(): String = s + " is pimped"
}

println("My String".pimpMyString())


Spark使用Pimp My Library模式来添加只在特定类型的RDDs上可用的特定方法。如:Key Value Pair RDDs

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
*/
implicit class PairRDDFunctions[K, V](self: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
extends Logging
with SparkHadoopMapReduceUtil
with Serializable
{
}


这看起来你期望去设计你自己的monoid/semigroup库。不要担心,twitter已经做了并且使其可以用于Spark!(这意味着一切都是可序列化的)。他们还写的让其有效率(译注:这句没懂,原文:They have also written it in a way such that it performs very)

https://github.com/twitter/algebird

合在一起,Monoids与Spark

我们需要在spark中聚集RDDs时编写很多函数,不幸的是,这些函数大致看上去是一样的但是用一般方式很难编写。使用Monoids是一种方式来达到目的,这是一个实际的例子:

//This is a call from an aggregation section that updates state with the HyperLogLog object
val stateUniques = makeModelUniquesTime.updateStateByKey(updateTotalCountState[HLL])
//This is a call from an aggregation section that updates state with the Long
val statePV = makeModelCountReduceWithTime.updateStateByKey(updateTotalCountState[Long])

//This was originally implemented as tow methods, one for HLL and one for Long. With Monoids we can write a singel method that takes care of both cases.
def updateTotalCountState[U](values: Seq[(BananaTimestamp, U)], state: Option[(BananaTimeStamp, U)])(implicit monoid: Monoid[U], ct: ClassTag[U]): Option[(BananaTimestamp, U)] = {
val defaultState = (null, monoid.zero)
values match {
case Nil => Some(state.getOrElse(defaultState))
case _ =>
val hdT = values(0)._1
// The reduction logic is now contained in the monoid definitions as opposed to thest functions. We can instead distil this to what is takes to update state
val v = values.map{case (_, a) => a}.reduce(monoid.plus)
val stateReceived = state.getOrElse(defaultState)
if(checkResetState(stateReceived._1, hdT)) Some((hdT, v)) else Some((hdT, monoid.plus(v, stateReceived._2)))
}
}


原文链接:https://thewanderingmonad.wordpress.com/2015/05/17/introduction-to-monoids-and-semigroups-with-spark/

参考

1、monoid http://hongjiang.info/semigroup-and-monoid/

2、https://zh.wikipedia.org/wiki/%E5%B9%BA%E5%8D%8A%E7%BE%A4

3、http://www.ituring.com.cn/article/195776
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  monoid spark streaming