[翻译]Introduction to Monoids and Semigroups with Spark

monoid(幺半群 译注:参考附注1翻译,下文中继续使用英文名)是一个带有二元运算(+)和一个单位元(译注:原文为identity element)i使得对于任意x,x+i=i+x=x。注意它不像群(译注:group,数学上翻译为群),它不带有逆元素。也可以说是带有单位元的半群(semigroup)




结合律 => (a+b)+c == a+(b+c) 和 单位元=>0+n==n+0==n

1. 整数和乘法

结合律=>(a*b)c==a(b*c) 和单位元=>1*n==n*1==n












//It is up to the developer to enforce the associativity rule!!!
trait SemiGroup[T]{
def op(a: T, b: T): T

trait Monoid[T] extends SemiGroup[T]{
def zero: T


object Monoids{
implicit object IntAdditionMonoid extends Monoid[Int]{
def op(a: Int, b: Int): Int = a + b
def zeru: Int = 0


implicit object IntAdditionMonoid extends Monoid[Int]{
def op(a: Int, b: Int): Int = a + b
def zero: Int = 0

val listA = List(1,3,5,6)

def reduceWithMonoid[A](seq:Seq[A])(implicit ev:Monoid[A]): A = {


implicit object IntAdditionMonoid extends Monoid[Int]{
def op(a: Int, b: Int): Int = a + b
def zero: Int = 0

//We now have must use a class as type parameters are required due to the fact that tuples themselves have classes.
//Here our goal is to define functionality for tuples that contain monoid abiding types
class Tuple2SemiGroup[A,B]()(implicit sg1: SemiGroup[A], sg2: SemiGroup[B]) extends SemiGroup[(A,B)]{
def op(a: (A,B), b: (A,B)): (A,B) = (sg1.op(a._1, b._1), sg2.op(a._2, b._2))

//As we cannot make above an implicit class because that actually does something different (more on this with an aside about pimp my library pattern soon)
//Well we can use another feature of implicits which are implicit conversions. This function provides logic on how to change a Tuple that contains Semigroups and return a SemiGroup of the tuple itself
implicit def tuple2SemiGroup[A,B](implicit sg1: SemiGroup[A],sg2: SemiGroup[B]): SemiGroup[(A,B)] = {
new Tuple2SemiGroup[A,B]()(sg1,sg2)

val listA = List((1,2),(3,4),(5,2),(6,9))

def reduceWithMonoid[A](seq: Seq[A])(implicit ev: SemiGroup[A]): A = {



在多看一个例子,Semigroups 可以很容易应用在合并2个maps关联键并且对值求和。

implicit object IntAdditionMonoid extends Monoid[Int]{
def op(a: Int, b: Int): Int = a + b
def zero: Int = 0

//Here we only need to assume that the values can form a SemiGroup as the keys are just being combined.
class MapSemiGroup[K,V]()(implicit sg1: SemiGroup[V]) extends SemiGroup[Map[K,V]]{
//We are aggregating where the initial map is one of the maps and we loop through key values of other one and combine.
//This way any keys that don't appear in the looping map are there already,all keys that appear in both are overwritten
def op(iteratingMap: Map[K,V], startingMap: Map[K,V]): Map[K,V] = iteratingMap.aggregate(startingMap)({
(currentMap: Map[K,V], kv: (K,V)) => {
val newValue: V = startingMap.get(kv._1).map(v => sg1.op(v, kv._2)).get
currentMap + (kv._1 -> newValue)
//This is the combine part (if done in parallel, could have two different maps that need to be combined) this assumes that all keys are already combined....
(mapOne: Map[K,V], mapTwo: Map[K,V]) => mapOne ++ mapTwo

//As we cannot make above an implicit class because that actually does something diferent (more on this with an aside about pimp my library pattern soon)
//Well we can use another feature of implicits which are implicit conversions. This function provides logic on how to change a Tuple that contains Semigroups and return a SemiGroup of the tuple itself
implicit def mapSemiGroup[K,V](implicit sg1: SemiGroup[V]): SemiGroup[Map[K,V]] = {
new MapSemiGroup[K,V]()(sg1)

val mapA = Map("A" -> 1, "B" -> 2, "D" -> 5)
val mapB = Map("A" -> 3, "C" -> 3, "D" -> 1)
val mapC = Map("B" -> 10, "D" -> 3)

def reduceWithMonoid[A](seq: Seq[A])(implicit ev: SemiGroup[A]): A = {
println(reduceWithMonoid(List(mapA, mapB, mapC)))

Pimp My Library(译注:参考3)示例和我们为什么不用implicit classes

//An implicit class takes a constructor which is the class to be pimpd. You can then define methods etc. which will be "available" on that type as though it was native functionality!!!!
implicit class PimpedString(s: String){
def pimpMyString(): String = s + " is pimped"

println("My String".pimpMyString())

Spark使用Pimp My Library模式来添加只在特定类型的RDDs上可用的特定方法。如:Key Value Pair RDDs


* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
implicit class PairRDDFunctions[K, V](self: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
extends Logging
with SparkHadoopMapReduceUtil
with Serializable

这看起来你期望去设计你自己的monoid/semigroup库。不要担心,twitter已经做了并且使其可以用于Spark!(这意味着一切都是可序列化的)。他们还写的让其有效率(译注:这句没懂,原文:They have also written it in a way such that it performs very)




//This is a call from an aggregation section that updates state with the HyperLogLog object
val stateUniques = makeModelUniquesTime.updateStateByKey(updateTotalCountState[HLL])
//This is a call from an aggregation section that updates state with the Long
val statePV = makeModelCountReduceWithTime.updateStateByKey(updateTotalCountState[Long])

//This was originally implemented as tow methods, one for HLL and one for Long. With Monoids we can write a singel method that takes care of both cases.
def updateTotalCountState[U](values: Seq[(BananaTimestamp, U)], state: Option[(BananaTimeStamp, U)])(implicit monoid: Monoid[U], ct: ClassTag[U]): Option[(BananaTimestamp, U)] = {
val defaultState = (null, monoid.zero)
values match {
case Nil => Some(state.getOrElse(defaultState))
case _ =>
val hdT = values(0)._1
// The reduction logic is now contained in the monoid definitions as opposed to thest functions. We can instead distil this to what is takes to update state
val v = values.map{case (_, a) => a}.reduce(monoid.plus)
val stateReceived = state.getOrElse(defaultState)
if(checkResetState(stateReceived._1, hdT)) Some((hdT, v)) else Some((hdT, monoid.plus(v, stateReceived._2)))



1、monoid http://hongjiang.info/semigroup-and-monoid/


