spark学习-73-源代码:Endpoint模型介绍(5)-Netty通讯小例子
2018-01-07 16:30
531 查看
想理解Rpc的通讯,先看看Netty的通讯小例子,有助于理解Spark的Rpc通讯原理
先看一个整体图
静态图:
动态图gif动画:
Spark2.2以后统一了通讯方式,全部是netty方式,根据源码的思路用scala写了一个Demo级别的netty通信
>
先看一个整体图
静态图:
动态图gif动画:
Spark2.2以后统一了通讯方式,全部是netty方式,根据源码的思路用scala写了一个Demo级别的netty通信
package com.spark.netty import io.netty.bootstrap.ServerBootstrap import io.netty.channel.ChannelInitializer import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.handler.codec.serialization.{ClassResolvers, ClassResolver, ObjectDecoder, ObjectEncoder} /** * Created by root on 2016/11/18. */ class NettyServer { def bind(host: String, port: Int): Unit = { //配置服务端线程池组 //用于服务器接收客户端连接 val bossGroup = new NioEventLoopGroup() //用户进行SocketChannel的网络读写 val workerGroup = new NioEventLoopGroup() try { //是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度 val bootstrap = new ServerBootstrap() //将两个NIO线程组作为参数传入到ServerBootstrap bootstrap.group(bossGroup, workerGroup) //创建NioServerSocketChannel .channel(classOf[NioServerSocketChannel]) //绑定I/O事件处理类 .childHandler(new ChannelInitializer[SocketChannel] { override def initChannel(ch: SocketChannel): Unit = { ch.pipeline().addLast( // new ObjectEncoder, // new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)), new ServerHandler ) } }) //绑定端口,调用sync方法等待绑定操作完成 val channelFuture = bootstrap.bind(host, port).sync() //等待服务关闭 channelFuture.channel().closeFuture().sync() } finally { //优雅的退出,释放线程池资源 bossGroup.shutdownGracefully() workerGroup.shutdownGracefully() } } } object NettyServer { def main(args: Array[String]) { val host = "127.0.0.1" val port = "6532".toInt val server = new NettyServer server.bind(host, port) } }10833
>
package com.spark.netty import io.netty.bootstrap.Bootstrap import io.netty.channel.ChannelInitializer import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.{NioSocketChannel, NioServerSocketChannel} import io.netty.handler.codec.serialization.{ClassResolvers, ObjectDecoder, ObjectEncoder} /** * Created by root on 2016/11/18. */ class NettyClient { def connect(host: String, port: Int): Unit = { //创建客户端NIO线程组 val eventGroup = new NioEventLoopGroup //创建客户端辅助启动类 val bootstrap = new Bootstrap try { //将NIO线程组传入到Bootstrap bootstrap.group(eventGroup) //创建NioSocketChannel .channel(classOf[NioSocketChannel]) //绑定I/O事件处理类 .handler(new ChannelInitializer[SocketChannel] { override def initChannel(ch: SocketChannel): Unit = { ch.pipeline().addLast( // new ObjectEncoder, // new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)), new ClientHandler ) } }) //发起异步连接操作 val channelFuture = bootstrap.connect(host, port).sync() //等待服务关闭 channelFuture.channel().closeFuture().sync() } finally { //优雅的退出,释放线程池资源 eventGroup.shutdownGracefully() } } } object NettyClient { def main(args: Array[String]) { val host = "127.0.0.1" val port = "6532".toInt val client = new NettyClient client.connect(host, port) } }package com.spark.netty import io.netty.buffer.{Unpooled, ByteBuf} import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter} /** * Created by root on 2016/11/18. */ class ServerHandler extends ChannelInboundHandlerAdapter { /** * 有客户端建立连接后调用 */ override def channelActive(ctx: ChannelHandlerContext): Unit = { println("channelActive invoked") } /** * 接受客户端发送来的消息 */ override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = { println("channelRead invoked") val byteBuf = msg.asInstanceOf[ByteBuf] val bytes = new Array[Byte](byteBuf.readableBytes()) byteBuf.readBytes(bytes) val message = new String(bytes, "UTF-8") println(message) val back = "good boy!" val resp = Unpooled.copiedBuffer(back.getBytes("UTF-8")) println(msg) ctx.write(resp) } /** * 将消息对列中的数据写入到SocketChanne并发送给对方 */ override def channelReadComplete(ctx: ChannelHandlerContext): Unit = { println("channekReadComplete invoked") ctx.flush() } }package com.spark.netty import io.netty.buffer.{ByteBuf, Unpooled} import io.netty.channel.{ChannelInboundHandlerAdapter, ChannelHandlerContext, ChannelHandlerAdapter} /** * Created by root on 2016/11/18. */ class ClientHandler extends ChannelInboundHandlerAdapter { override def channelActive(ctx: ChannelHandlerContext): Unit = { println("channelActive") val content = "hello server" ctx.writeAndFlush(Unpooled.copiedBuffer(content.getBytes("UTF-8"))) //发送case class 不在发送字符串了,封装一个字符串 // ctx.writeAndFlush(RegisterMsg("hello server")) } override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = { println("channelRead") val byteBuf = msg.asInstanceOf[ByteBuf] val bytes = new Array[Byte](byteBuf.readableBytes()) byteBuf.readBytes(bytes) val message = new String(bytes, "UTF-8") println(message) } override def channelReadComplete(ctx: ChannelHandlerContext): Unit = { println("channeReadComplete") ctx.flush() } //发送异常时关闭 override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { println("exceptionCaught") ctx.close() } }package com.spark.netty /** * Created by root on 2016/11/18. */ case class RegisterMsg(content: String) extends Serializable
先启动NettyServer,然后在启动NettyClient.打印结果
相关文章推荐
- spark学习-75-源代码:Endpoint模型介绍(6)-Endpoint的消息的接收(2)
- spark学习-74-源代码:Endpoint模型介绍(6)-Endpoint的消息的接收
- spark学习-70-源代码:Endpoint模型介绍(2)-启动流程
- spark学习-71-源代码:Endpoint模型介绍(3)-Endpoint Send&Ask流程
- 【Spark2.0源码学习】-3.Endpoint模型介绍
- Spark入门到精通视频学习资料--第二章:Spark生态系统介绍,Spark整体概述与Spark编程模型(2讲)
- 【Spark2.0源码学习】-3.Endpoint模型介绍
- netty学习(一)--linux下的网络io模型简单介绍
- Netty源代码学习——ChannelPipeline模型分析
- spark学习-63-源代码:schedulerBackend和taskScheduler的创建(1)-local
- spark学习-65-源代码:schedulerBackend和taskScheduler的创建(3)-local-cluster
- 3个netty5的例子,简单介绍netty的用法
- Netty IO线程模型学习总结
- spark学习-61-源代码:ShutdownHookManager虚拟机关闭钩子管理器
- spark学习-29-源代码解析从start-all.sh脚本开始
- 源代码学习辅助工具介绍
- 韩顺平_php从入门到精通_视频教程_第13讲_选择器使用细节_块元素和行内元素_盒子模型_盒子模型经典应用①_学习笔记_源代码图解_PPT文档整理
- 【Spark2.0源码学习】-8.SparkContext与Application介绍
- 韩顺平_轻松搞定网页设计(html+css+javascript)_第19讲_js运行原理_js开发工具介绍_js程序(hello)_js基本语法_学习笔记_源代码图解_PPT文档整理
- Python机器学习实践例子——Titanic乘客生存预测模型分析