Scalaz(54)- scalaz-stream: 函数式多线程编程模式-Free Streaming Programming Model
2016-08-19 11:31
441 查看
长久以来,函数式编程模式都被认为是一种学术研究用或教学实验用的编程模式。直到近几年由于大数据和多核CPU的兴起造成了函数式编程模式在一些实际大型应用中的出现,这才逐渐改变了人们对函数式编程无用论的观点。通过一段时间对函数式编程方法的学习,我们了解到Free Monad的算式/算法关注分离(separation of concern)可以是一种很实用的函数式编程模式。用Free Monad编写的程序容易理解并具备良好的可维护性。scalaz-stream的流程控制和多线程运算模式可以实现程序的安全并行运算。把Free Monad和scalaz-stream有机结合起来可以形成一种新的编程模式来支持函数式多线程编程来编制具备安全性、易扩展、易维护的并行运算程序。我们先从一个简单的Free Monad程序开始:
在这个程序里我们按照一个固定的框架步骤来实现“定义语句”、“升格Free”、“功能描述”及“实现方式”。这里特别需要注意的是所谓的算式/算法关注分离,即“功能描述”和“实现方式”是互不关联的。这样我们可以提供不同版本的实现方式来进行测试、环境转换等工作。Free Monad的具体运算方式如下:
运算结果返回A:对于prgGetName来说就是Unit。不过如果直接运行foldMapRec有可能会产生副作用(siede effect)。这样不符合纯代码要求,无法实现这个程序与其它程序的函数组合。我们需要把这段可能产生副作用的代码放到Task里:
这样我们就获得了一个异线程的延迟运算。我们可以放心地用这个taskGetName进行函数组合。把这个Free Monad程序转换成scalaz-stream的Process也很容易:
我们用Process.eval直接把它转换成Process[Task,Unit]类型。下面我们用scalaz-stream的运算方式来运算这个Free Monad程序:
运算结果如下:
虽然这个例子看起来很简单,但其中代表的意义却不小:我们潜移默化地实现了函数式多线程编程了。
如果我们需要Free Monad程序返回运算结果的话就调整一下功能描述(算式):
再运算一下:
用纯代码方式echo输入:
也可以把结果发送到一个Sink来显示:
我们试着再加一个Free程序功能:验证用户编号
stream流程是:先读取用户编号然后验证,跟着在Sink输出结果:
不错!Free Monad和scalar-stream可以很好的集成在一起。
我把这节讨论的示范源代码提供给大家:
import scalaz._ import Scalaz._ import scalaz.concurrent._ import scalaz.stream._ import scala.language.higherKinds import scala.language.implicitConversions object freeStream { //1. 定义语句 object DSLs { sealed trait Interact[A] case class Ask(q: String) extends Interact[String] case class Tell(m: String) extends Interact[Unit] //2. Free升格 implicit def interactToFree[A](ia: Interact[A]) = Free.liftF(ia) } //3. 程序逻辑/算式 object PRGs { import DSLs._ val prgGetName: Free[Interact,Unit] = for { first <- Ask("What's your first name?") last <- Ask("What's your last name?") _ <- Tell(s"Hello $first $last") } yield () } //4. 实现方式/算式 object IMPs { import DSLs._ object InteractConsole extends (Interact ~> Id) { def apply[A](ia: Interact[A]): Id[A] = ia match { case Ask(q) => {println(q); Console.readLine} case Tell(m) => println(m) } } }
在这个程序里我们按照一个固定的框架步骤来实现“定义语句”、“升格Free”、“功能描述”及“实现方式”。这里特别需要注意的是所谓的算式/算法关注分离,即“功能描述”和“实现方式”是互不关联的。这样我们可以提供不同版本的实现方式来进行测试、环境转换等工作。Free Monad的具体运算方式如下:
//5. 运算/Run import DSLs._,PRGs._,IMPs._ prgGetName.foldMapRec(InteractConsole)
运算结果返回A:对于prgGetName来说就是Unit。不过如果直接运行foldMapRec有可能会产生副作用(siede effect)。这样不符合纯代码要求,无法实现这个程序与其它程序的函数组合。我们需要把这段可能产生副作用的代码放到Task里:
val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)} //> taskGetName : scalaz.concurrent.Task[scalaz.Scalaz.Id[Unit]] = scalaz.concurrent.Task@282ba1e
这样我们就获得了一个异线程的延迟运算。我们可以放心地用这个taskGetName进行函数组合。把这个Free Monad程序转换成scalaz-stream的Process也很容易:
val prcGetName = Process.eval(taskGetName) //> prcGetName : scalaz.stream.Process[scalaz.concurrent.Task,scalaz.Scalaz.Id[Unit]] = Await(scalaz.concurrent.Task@282ba1e,<function1,<function1>)
我们用Process.eval直接把它转换成Process[Task,Unit]类型。下面我们用scalaz-stream的运算方式来运算这个Free Monad程序:
object FreeInteract extends App { import DSLs._,PRGs._,IMPs._ val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)} val prcGetName = Process.eval(taskGetName) prcGetName.run.run }
运算结果如下:
What's your first name? tiger What's your last name? chan Hello, tiger chan!
虽然这个例子看起来很简单,但其中代表的意义却不小:我们潜移默化地实现了函数式多线程编程了。
如果我们需要Free Monad程序返回运算结果的话就调整一下功能描述(算式):
val prgGetUserID = for { uid <- ask("Enter User ID:") } yield uid
再运算一下:
object FreeInteract extends App { import DSLs._,PRGs._,IMPs._ val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)} val prcGetName = Process.eval(taskGetName) //prcGetName.run.run Process.eval(Task.delay{prgGetUserID.foldMapRec(InteractConsole)}).runLog.run.map(println) ... Enter User ID: tiger123 tiger123
用纯代码方式echo输入:
pUserID.evalMap { uid => Task.delay {prgEchoInput(uid).foldMapRec(InteractConsole)} }.run.run ... Enter User ID: user234 user234
也可以把结果发送到一个Sink来显示:
val outSink: Sink[Task,String] = Process.constant{x =>Task.delay{prgEchoInput(x).foldMapRec(InteractConsole)}} (pUserID to outSink).run.run ... Enter User ID: jonathon jonathon
我们试着再加一个Free程序功能:验证用户编号
sealed trait Login[A] case class CheckID(id: String) extends Login[Boolean] ... def prgCheckID(id: String) = for { b <- Free.liftF(CheckID(id)) } yield b ... object UserLogin extends (Login ~> Id) { def apply[A](la: Login[A]): Id[A] = la match { case CheckID(id) => if (id === "tiger123") true else false } }
stream流程是:先读取用户编号然后验证,跟着在Sink输出结果:
def fCheckID: String => Task[String] = id => Task.delay { prgCheckID(id).foldMapRec(UserLogin) }.map(_.toString) val chCheckID = channel.lift(fCheckID) ((pUserID through chCheckID) to outSink).run.run ... Enter User ID: tiger123 true ... Enter User ID: johnny234 false
不错!Free Monad和scalar-stream可以很好的集成在一起。
我把这节讨论的示范源代码提供给大家:
import scalaz._ import Scalaz._ import scalaz.concurrent._ import scalaz.stream._ object DSLs { sealed trait Interact[A] case class Ask(q: String) extends Interact[String] case class Tell(m: String) extends Interact[Unit] object Interact { def ask(q: String): Free[Interact, String] = Free.liftF(Ask(q)) def tell(m: String): Free[Interact, Unit] = Free.liftF(Tell(m)) } sealed trait Login[A] case class CheckID(id: String) extends Login[Boolean] } object PRGs { import DSLs._ import Interact._ val prgGetName = for { first <- ask("What's your first name?") last <- ask("What's your last name?") _ <- tell(s"Hello, $first $last!") } yield() val prgGetUserID = for { uid <- ask("Enter User ID:") } yield uid def prgEchoInput(m: String) = tell(m) def prgCheckID(id: String) = for { b <- Free.liftF(CheckID(id)) } yield b } object IMPs { import DSLs._ object InteractConsole extends (Interact ~> Id) { def apply[A](ia: Interact[A]): Id[A] = ia match { case Ask(q) => { println(q); readLine } case Tell(m) => println(m) } } object UserLogin extends (Login ~> Id) { def apply[A](la: Login[A]): Id[A] = la match { case CheckID(id) => if (id === "tiger123") true else false } } } object FreeInteract extends App { import DSLs._,PRGs._,IMPs._ val taskGetName = Task.delay { prgGetName.foldMapRec(InteractConsole)} val prcGetName = Process.eval(taskGetName) //prcGetName.run.run val pUserID= Process.eval(Task.delay{prgGetUserID.foldMapRec(InteractConsole)}) //pUserID.evalMap { uid => Task.delay {prgEchoInput(uid).foldMapRec(InteractConsole)} }.run.run val outSink: Sink[Task,String] = Process.constant { x => Task.delay {prgEchoInput(x).foldMapRec(InteractConsole) } } //(pUserID to outSink).run.run def fCheckID: String => Task[String] = id => Task.delay { prgCheckID(id).foldMapRec(UserLogin) }.map(_.toString) val chCheckID = channel.lift(fCheckID) ((pUserID through chCheckID) to outSink).run.run
相关文章推荐
- 代码注释规范
- C语言基本教程 第7课:数组和字符串
- 详解Python实现按任意键继续/退出的功能
- C++语言的表达式模板:表达式模板的入门性介绍
- PHP中new static()与new self()的比较
- 【笔试】航天飞行器
- java实现spark streaming与kafka集成进行流式计算
- 实习时遇到的大神写的代码, 自己差距还是有的,潜力巨大!加油!
- leetcode:二叉树之Construct Binary Tree from Inorder and Postorder Traversal
- java里面的线程
- Java反射机制笔记
- Spring Security权限taglib
- github上Fuchsia项目相关文章翻译 - Relationship with LK (fuchsia-mirror/magenta/mg_and_lk.md)
- JAVA学习代码——了解java.io
- java的设计模式总结
- exit()函数详解
- thinkphp批量删除的实现
- Laravel 5.1 事件
- C++中的vector容器
- JAVA学习代码——追加文件内容的三种方法