用scala语言实现并行堆排序(top k)
2015-09-25 12:27
441 查看
因为项目需要对大量数据进行排序计算top k,开始了解并行计算框架,接触了spark,spark都是用scala写的,所以为了了解spark,恶补了一阵scala语言。
这是一种非常简练的函数式语言,最让我感觉兴趣的就是它天然支持并行计算,并且因为生成的目标代码是java虚拟上的class,所以与java有着天然的亲和力。可以与java代码之间自由的互相调用。
原本是想通过spark架构来实现大数据的快速排序(实现top k),仔细研究了spark后发现有难度,就暂时放弃了这个方案。但是想到了新的解决方法,就是利用scala(研究spark的副产品)的并行特性来实现大数据的快速排序模块,加入到系统中,供java代码调用。。。
下面的代码就是这个模块的核心排序算法。
总体的流程就是:
在top_mutable_par方法中,对要排序的数据进行分段,然后利用scala的并行特性,以并行方式调用sort_range对每一段数据进行分段排序,之后再reduce所有的分段排序结果
这是一种非常简练的函数式语言,最让我感觉兴趣的就是它天然支持并行计算,并且因为生成的目标代码是java虚拟上的class,所以与java有着天然的亲和力。可以与java代码之间自由的互相调用。
原本是想通过spark架构来实现大数据的快速排序(实现top k),仔细研究了spark后发现有难度,就暂时放弃了这个方案。但是想到了新的解决方法,就是利用scala(研究spark的副产品)的并行特性来实现大数据的快速排序模块,加入到系统中,供java代码调用。。。
下面的代码就是这个模块的核心排序算法。
总体的流程就是:
在top_mutable_par方法中,对要排序的数据进行分段,然后利用scala的并行特性,以并行方式调用sort_range对每一段数据进行分段排序,之后再reduce所有的分段排序结果
import scala.collection.mutable import scala.collection.JavaConversions /** * 实现并行堆排序算法 * @author guyadong * @param <A> * @param <B> * @param <S> */ class HeapSort[A,B,S<:Iterable[A]](f:A=>B)(implicit ord: Ordering[B]){ /** * 对l排序返回排序后的Seq * @param l 待排序集合的迭代器 * @param desc 降/升序(默认为true,降序) * @return */ def sort(l: S,desc:Boolean=true)=HeapSort.sort(f)(l,0,desc) /** * 对l排序并返回前top个结果 * @param l 待排序集合的迭代器 * @param top 返回最多结果数目 * @param desc 降/升序(默认为true,降序) * @return */ def top(l: S,top:Int,desc:Boolean=true)=HeapSort.sort(f)(l,top,desc) /** * 对可变集合排序,返回排序后的Seq * @param l 待排序可变集合的迭代器 * @param desc 降/升序(默认为true,降序) * @return */ def sort_m[M<:mutable.Seq[A]](l: M,desc:Boolean=true)=HeapSort.sort_mutable(f)(l,0,desc) /** * 对可变集合l排序并返回前top个结果 * @param l 待排序可变集合的迭代器 * @param top 返回最多结果数目 * @param desc 降/升序(默认为true,降序) * @return */ def top_m[M<:mutable.Seq[A]](l: M,top:Int,desc:Boolean=true)=HeapSort.sort_mutable(f)(l,top,desc) /** * 对可变集合l并行排序并返回前top个结果 * @param l 待排序可变集合的迭代器 * @param top 返回最多结果数目 * @param desc 降/升序(默认为true,降序) * @return */ def top_m_par[M<:mutable.Seq[A]](l: M,top:Int,desc:Boolean=true)=HeapSort.top_mutable_par(f)(l,top,desc) /** * 对可变集合l的指定范围排序并返回排序后的Seq * @param seq 待排序可变集合 * @param top 返回最多结果数目 * @param desc 降/升序(默认为true,降序) * @param from 待排序的起始位置 * @param until 待排序的结束位置 * @return */ def sort_range[M<:mutable.Seq[A]](seq: M,top:Int,desc:Boolean=true)(from:Int=0, until:Int=seq.length)=HeapSort.sort_mutableRange(f)(seq,top,desc)(from, until) /** * 对seq中两个已经排序的区段进行合并排序,将src合并到dst * @param seq 可变集合 * @param src 待合并的源区段(起始位置,结束位置) * @param dst 待合并的目标区段(起始位置,结束位置) * @param desc 降/升序(默认为true,降序) * @return */ def merge2Seq(seq: mutable.Seq[A],src:(Int,Int),dst:(Int,Int), desc: Boolean=true)=HeapSort.merge2Seq(f)(seq, src, dst, desc) /** * 对seq中两个已经排序的区段进行合并排序,将src合并到dst * @param seq 可变集合 * @param src 待合并的源区段(起始位置,结束位置) * @param dst 待合并的目标区段(起始位置,结束位置) * @param desc 降/升序(默认为true,降序) * @return */ def merge2Seq2(seq: mutable.Seq[A],src:(Int,Int),dst:(Int,Int), desc: Boolean=true)=HeapSort.merge2Seq2(f)(seq, src, dst, desc) /** * 对seq中两个已经排序的区段进行合并排序,将src合并到dst<br> * 该算法在排序过程不申请新内存 * @param seq 可变集合 * @param src 待合并的源区段(起始位置,结束位置) * @param dst 待合并的目标区段(起始位置,结束位置) * @param desc 降/升序(默认为true,降序) * @return */ def merge2SeqNM(seq: mutable.Seq[A],src:(Int,Int),dst:(Int,Int), desc: Boolean=true)=HeapSort.merge2SeqNM(f)(seq, src, dst, desc) } object HeapSort { def sort[A, B, S <: Iterable[A]](f: A => B)(iterator: S, top: Int = 0,desc:Boolean=true)(implicit ord: Ordering[B]) = { val bf = iterator.toBuffer sort_mutable(f)(bf, top,desc) } def sort_mutable[A, B, S<:mutable.Seq[A]](f: A => B)(seq: S, top: Int = 0,desc:Boolean=true)(implicit ord: Ordering[B]) = { sort_mutableRange(f)(seq,top,desc)() (if (top < seq.length && top > 0) seq.takeRight(top) else seq).reverse } private def sort_mutableRange[A, B, S<:mutable.Seq[A]](f: A => B)(seq: S, top: Int = 0, desc:Boolean=true)(from:Int=0, until:Int=seq.length)(implicit ord: Ordering[B]) = { buildHeapRange(f)(seq,desc)(from,until); // 构建堆 val sublen=until-from val toplen = if (top <= 0 || top >= sublen) sublen else top var i=until - 1 var continue=true while(continue){ swap(seq, from, i) if (i > (until - toplen)) { heapify(f)(seq, from, i, desc, from) i -= 1 }else continue=false } (i,until) } private def buildHeapRange[A, B](f: A => B)(seq: mutable.Seq[A],desc:Boolean)(from:Int,until:Int)(implicit ord: Ordering[B]) = { var i=from+((until-from) >>> 1) - 1 while(i>=from){ heapify(f)(seq, i, until,desc,from) i-=1 } } def cmp1_gt [A, B](f: A => B)(l: A, r: A)(implicit ord: Ordering[B]) = ord.gt(f(l), f(r)) def cmp1_lt [A, B](f: A => B)(l: A, r: A)(implicit ord: Ordering[B]) = ord.lt(f(l), f(r)) def cmp_gt [A, B](f: A => B,seq: mutable.Seq[A])(l: Int, r: Int)(implicit ord: Ordering[B]) = cmp1_gt(f)(seq(l),seq(r)) def cmp_lt [A, B](f: A => B,seq: mutable.Seq[A])(l: Int, r: Int)(implicit ord: Ordering[B]) = cmp1_lt(f)(seq(l),seq(r)) private def heapify[A, B](f: A => B)(seq: mutable.Seq[A], startpos: Int, max: Int, desc: Boolean,off:Int)(implicit ord: Ordering[B]): Unit = { def gt = (l: Int, r: Int) => cmp_gt(f, seq)(l, r) def lt = (l: Int, r: Int) => cmp_lt(f, seq)(l, r) val cmp = if (desc) gt else lt var largest = 0 var idx = startpos var right=0 var left =0 do { right = off+((idx-off + 1) << 1) left = right - 1 largest = if (left < max && cmp(left, idx)) left else idx if (right < max && cmp(right, largest)) largest = right if (largest != idx) { swap(seq, largest, idx) idx = largest } else return } while (true) } private def swap[A](seq: mutable.Seq[A], i: Int, j: Int) = { val temp = seq(i) seq(i) = seq(j) seq(j) = temp } private def swap3[A](seq: mutable.Seq[A], i: Int, j: Int,k:Int) = { val temp = seq(i) seq(i) = seq(j) seq(j) = seq(k) seq(k) = temp } // private def _duplicateSeq[A](src: collection.Seq[A], srcPos: Int, dest: mutable.Seq[A], destPos: Int, length: Int): mutable.Seq[A] = { // for (i <- 0 until length) dest(destPos + i) = src(srcPos + i) // dest // } private def _duplicateSeq[A](src: collection.Seq[A], srcPos: Int, dest: mutable.Seq[A], destPos: Int, length: Int): mutable.Seq[A] = { var i=0 while(i<length){ dest(destPos + i) = src(srcPos + i) i+=1 } dest } def merge2Seq[A, B](f: A => B)(seq: mutable.Seq[A], src: (Int, Int), dst: (Int, Int), desc: Boolean)(implicit ord: Ordering[B]): (Int, Int) = { if (!(if (desc) cmp_gt(f, seq)(dst._1, src._2 - 1) else cmp_lt(f, seq)(dst._1, src._2 - 1))) { if (if (desc) cmp_gt(f, seq)(src._1, dst._2 - 1) else cmp_lt(f, seq)(src._1, dst._2 - 1)) { val (srclen, dstlen) = ((src._2 - src._1), (dst._2 - dst._1)) val cplen = math.min(srclen, dstlen) _duplicateSeq(seq, dst._1 + cplen, seq, dst._1, dstlen - cplen) _duplicateSeq(seq, src._2 - cplen, seq, dst._2 - cplen, cplen) } else { val q = mutable.Queue[A]() def gt = (r: Int) => cmp1_gt(f)(seq(r), q.head) def lt = (r: Int) => cmp1_lt(f)(seq(r), q.head) val cmpdst = if (desc) gt else lt var (topsrc, idx) = (src._2 - 1, dst._2 - 1) while (idx >= dst._1) { q.enqueue(seq(idx)) if (cmpdst(topsrc)) { seq(idx) = seq(topsrc) topsrc -= 1 } else seq(idx) = q.dequeue() idx -= 1 } while (idx >= dst._1){ seq(idx) = q.dequeue() idx -= 1 } } } dst } def merge2Seq2[A, B](f: A => B)(seq: mutable.Seq[A], src: (Int, Int), dst: (Int, Int), desc: Boolean)(implicit ord: Ordering[B]): (Int, Int) = { if (!(if (desc) cmp_gt(f, seq)(dst._1, src._2 - 1) else cmp_lt(f, seq)(dst._1, src._2 - 1))) { if (if (desc) cmp_gt(f, seq)(src._1, dst._2 - 1) else cmp_lt(f, seq)(src._1, dst._2 - 1)) { val (srclen, dstlen) = ((src._2 - src._1), (dst._2 - dst._1)) val cplen = math.min(srclen, dstlen) _duplicateSeq(seq, dst._1 + cplen, seq, dst._1, dstlen - cplen) _duplicateSeq(seq, src._2 - cplen, seq, dst._2 - cplen, cplen) } else { val q = seq.slice(dst._1, dst._2) def gt = (l: Int,r:Int) => cmp1_gt(f)(seq(l), q(r)) def lt = (l: Int,r:Int) => cmp1_lt(f)(seq(l), q(r)) val cmpdst = if (desc) gt else lt var (topdst,topsrc, idx) = (q.length-1,src._2 - 1, dst._2 - 1) while (idx >= dst._1&&topsrc>=src._1) { if (cmpdst(topsrc,topdst)) { seq(idx) = seq(topsrc) topsrc -= 1 } else{ seq(idx) = q(topdst) topdst -= 1 } idx -= 1 } if(idx>=dst._1) _duplicateSeq(q, topdst-(idx-dst._1), seq, dst._1, idx-dst._1+1) } } dst } def merge2SeqNM[A, B](f: A => B)(seq: mutable.Seq[A], src: (Int, Int), dst: (Int, Int), desc: Boolean)(implicit ord: Ordering[B]): (Int, Int) = { if (!(if (desc) cmp_gt(f, seq)(dst._1, src._2 - 1) else cmp_lt(f, seq)(dst._1, src._2 - 1))) { if (if (desc) cmp_gt(f, seq)(src._1, dst._2 - 1) else cmp_lt(f, seq)(src._1, dst._2 - 1)) { val (srclen, dstlen) = ((src._2 - src._1), (dst._2 - dst._1)) val cplen = math.min(srclen, dstlen) _duplicateSeq(seq, dst._1 + cplen, seq, dst._1, dstlen - cplen) _duplicateSeq(seq, src._2 - cplen, seq, dst._2 - cplen, cplen) } else { var (idx,qbf,qbt,qh)=(dst._2-1,dst._2-1,dst._2-1,dst._2-1) var st=src._2-1 var swapst=()=>{} var swapqh=()=>{} def gt = (l: Int) => cmp_gt(f, seq)(l, qh) def lt = (l: Int) => cmp_lt(f, seq)(l, qh) val cmpdst = if (desc) gt else lt def swaptop(top: Int) = { val temp = seq(idx) seq(idx) = seq(top) seq(top) = temp } def getql=()=>qbf+(qh-qbf+1)%(qbt-qbf+1) def nextqh=()=>qbt-(qbt-qh+1)%(qbt-qbf+1) // def moveStep(from: Int, to: Int, step: Int) =for (i <- (if (step > 0) (from to to).reverse else (from to to))) seq(i + step) = seq(i) def moveStep(from: Int, to: Int, step: Int) = { var i = if (step > 0) to else from def upf() = i >= from def dnt() = i <= to val (s, c) = if (step > 0) (-1, upf _) else (1, dnt _) while (c()) { seq(i + step) = seq(i) i += s } } def swapLeft(from:Int,to:Int)={ val tmp=seq(from-1) moveStep(from,to,-1) seq(to)=tmp } def swapRight(from:Int,to:Int)={ val tmp=seq(to+1) moveStep(from,to,1) seq(from)=tmp } def swapStTail() = { swaptop(st) val ql = getql() if (ql > qbf) if (qh - qbf > qbt - ql) { swap(seq, st, qbt) swapRight(ql, qbt - 1) qbf = st } else { swapLeft(qbf, qh) qbf = st qh = nextqh() } else{ qbf=st } } def swapStHead() = { swaptop(st) swapst = swapStTail swapqh = swapQhEnable qh = st qbf = st qbt = st } def swapQhDisable() = { qbf -= 1 qbt -= 1 qh -= 1 } def swapQhEnable() = { swaptop(qh) qh = nextqh() } swapst = swapStHead swapqh = swapQhDisable while (idx >= dst._1 && st >= src._1) { if (cmpdst(st)) { swapst() st -= 1 } else swapqh() idx -= 1 } if (idx >= dst._1) { val ql = getql() _duplicateSeq(seq, ql, seq, dst._1, qbt - ql + 1) _duplicateSeq(seq, qbf, seq, dst._1 + qbt - ql + 1, ql - qbf) } } } dst } private val processors=Runtime.getRuntime.availableProcessors()//获取cpu核心数 def top_mutable_par[A, B, M <: mutable.Seq[A]](f: A => B)(seq: M, top: Int, desc: Boolean = true)(implicit ord: Ordering[B]) = { //根据cpu核心数对要排序的数据分段 val step = (seq.length+(processors)-1) / (processors) //以并行方式对每一段数据进行排序 val rangs = for (i <- (0 until (seq.length + step - 1) / step).par) yield { sort_mutableRange(f)(seq, top)(i * step, math.min(seq.length, (i + 1) * step)) } def merge = (left: (Int, Int), right: (Int, Int)) => if ((right._2 - right._1) > (left._2 - left._1)) merge2SeqNM(f)(seq, left, right, desc) else merge2SeqNM(f)(seq, right, left, desc) //调用用reduce对分段排序后的结果进行合并 val r = rangs.reduce(merge(_, _)) //返回排序结果(需要反序) seq.slice(r._1, r._2).reverse } def main(args: Array[String]) { //测试代码 val m = new HeapSort[Int, Int, mutable.Buffer[Int]]((w: Int) => w) println(Array(7,11,9,17,15,21,8,30,14,0,12,15,55,2,3,18,22,23,4).aggregate(List[Int]())(m.seqop, m.combop).toString()) val rnd=new java.util.Random() val l=new Array[Int](40) for(i<-0 until 5){ l(i)=rnd.nextInt(100) } for(i<-5 until l.length){ l(i)=rnd.nextInt(100) } for (i <- 0 to 0) { println("==============time ", i,"=================") val s=l.toBuffer[Int] println(s) val t1 = System.currentTimeMillis val r1 = m.sort_range(s, 10, true)(0,5) val r2 = m.sort_range(s, 10, true)(5,40) val t2 = System.currentTimeMillis printf("sort time cost:%f seconds(%d mills) used\n", (t2 - t1) / 1024D, t2 - t1) for(i<-(r1._1 until r1._2)){ print(s(i)+",") } println(r1) for(i<-(r2._1 until r2._2)){ print(s(i)+",") } println(r2) m.merge2Seq2(s, r1, r2) for(i<-(r2._1 until r2._2).reverse){ print(s(i)+",") } println(r2) } } }
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- PropertyChangeListener简单理解
- 插入排序
- 冒泡排序
- 堆排序