您的位置:首页 > 运维架构 > 网站架构

基于Netty3的RPC架构笔记3之线程模型源码分析

2017-02-05 21:33 761 查看
      随着用户量上升,项目的架构也在不断的升级,由最开始的MVC的垂直架构(传统项目)到RPC架构(webservice,rest,netty,mina),再到SOA模型(dubbo),再到最近的微服务,又比如Tomcat6之前的IO模型都是BIO 也就是阻塞IO,到后来变成多路复用,也是阻塞IO。到非阻塞NIO,再到异步非阻塞AIO,

     言归正传,接着谈netty,传统IO是一个线程服务一个客户,后来通过netty,可以一个线程服务多个客户,下面的那个图展示的是netty的NIO通过引入多线程来提高性能,既一个线程负责一片用户


直接上代码

package com.cn;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import com.cn.pool.NioSelectorRunnablePool;
/**
* 启动函数
*
*/
public class Start {

public static void main(String[] args) {

//初始化线程
NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());

//获取服务类
ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool);

//绑定端口
bootstrap.bind(new InetSocketAddress(10101));

System.out.println("start");
}

}
package com.cn.pool;

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import com.cn.NioServerBoss;
import com.cn.NioServerWorker;
/**
* selector线程管理者
*
*/
public class NioSelectorRunnablePool {

/**
* boss线程数组
*/
private final AtomicInteger bossIndex = new AtomicInteger();
private Boss[] bosses;

/**
* worker线程数组
*/
private final AtomicInteger workerIndex = new AtomicInteger();
private Worker[] workeres;

public NioSelectorRunnablePool(Executor boss, Executor worker) {
initBoss(boss, 1);
initWorker(worker, Runtime.getRuntime().availableProcessors() * 2);
}

/**
* 初始化boss线程
* @param boss
* @param count
*/
private void initBoss(Executor boss, int count) {
this.bosses = new NioServerBoss[count];
for (int i = 0; i < bosses.length; i++) {
bosses[i] = new NioServerBoss(boss, "boss thread " + (i+1), this);
}

}

/**
* 初始化worker线程
* @param worker
* @param count
*/
private void initWorker(Executor worker, int count) {
this.workeres = new NioServerWorker[count];
for (int i = 0; i < workeres.length; i++) {
workeres[i] = new NioServerWorker(worker, "worker thread " + (i+1), this);
}
}

/**
* 获取一个worker
* @return
*/
public Worker nextWorker() {
return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)];

}

/**
* 获取一个boss
* @return
*/
public Boss nextBoss() {
return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
}

}

package com.cn;

import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;

import com.cn.pool.Boss;
import com.cn.pool.NioSelectorRunnablePool;
/**
* 服务类
*
*/
public class ServerBootstrap {

private NioSelectorRunnablePool selectorRunnablePool;

public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) {
this.selectorRunnablePool = selectorRunnablePool;
}

/**
* 绑定端口
* @param localAddress
*/
public void bind(final SocketAddress localAddress){
try {
// 获得一个ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 设置通道为非阻塞
serverChannel.configureBlocking(false);
// 将该通道对应的ServerSocket绑定到port端口
serverChannel.socket().bind(localAddress);

//获取一个boss线程
Boss nextBoss = selectorRunnablePool.nextBoss();
//向boss注册一个ServerSocket通道
nextBoss.registerAcceptChannelTask(serverChannel);
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.cn.pool;

import java.nio.channels.SocketChannel;
/**
* worker接口
*
*/
public interface Worker {

/**
* 加入一个新的客户端会话
* @param channel
*/
public void registerNewChannelTask(SocketChannel channel);

}
package com.cn.pool;

import java.nio.channels.ServerSocketChannel;
/**
* boss接口
*
*/
public interface Boss {

/**
* 加入一个新的ServerSocket
* @param serverChannel
*/
public void registerAcceptChannelTask(ServerSocketChannel serverChannel);
}
package com.cn;

import java.io.IOException;
import java.nio.channels.Selector;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

import com.cn.pool.NioSelectorRunnablePool;

/**
* 抽象selector线程类
*
*
*/
public abstract class AbstractNioSelector implements Runnable {

/**
* 线程池
*/
private final Executor executor;

/**
* 选择器
*/
protected Selector selector;

/**
* 选择器wakenUp状态标记
*/
protected final AtomicBoolean wakenUp = new AtomicBoolean();

/**
* 任务队列
*/
private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();

/**
* 线程名称
*/
private String threadName;

/**
* 线程管理对象
*/
protected NioSelectorRunnablePool selectorRunnablePool;

AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
this.executor = executor;
this.threadName = threadName;
this.selectorRunnablePool = selectorRunnablePool;
openSelector();
}

/**
* 获取selector并启动线程
*/
private void openSelector() {
try {
this.selector = Selector.open();
} catch (IOException e) {
throw new RuntimeException("Failed to create a selector.");
}
executor.execute(this);
}

@Override
public void run() {

Thread.currentThread().setName(this.threadName);

while (true) {
try {
wakenUp.set(false);

select(selector);

processTaskQueue();

process(selector);
} catch (Exception e) {
// ignore
}
}

}

/**
* 注册一个任务并激活selector
*
* @param task
*/
protected final void registerTask(Runnable task) {
taskQueue.add(task);

Selector selector = this.selector;

if (selector != null) {
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
} else {
taskQueue.remove(task);
}
}

/**
* 执行队列里的任务
*/
private void processTaskQueue() {
for (;;) {
final Runnable task = taskQueue.poll();
if (task == null) {
break;
}
task.run();
}
}

/**
* 获取线程管理对象
* @return
*/
public NioSelectorRunnablePool getSelectorRunnablePool() {
return selectorRunnablePool;
}

/**
* select抽象方法
*
* @param selector
* @return
* @throws IOException
*/
protected abstract int select(Selector selector) throws IOException;

/**
* selector的业务处理
*
* @param selector
* @throws IOException
*/
protected abstract void process(Selector selector) throws IOException;

}
package com.cn;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;

import com.cn.pool.Boss;
import com.cn.pool.NioSelectorRunnablePool;
import com.cn.pool.Worker;
/**
* boss实现类
*
*/
public class NioServerBoss extends AbstractNioSelector implements Boss{

public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
super(executor, threadName, selectorRunnablePool);
}

@Override
protected void process(Selector selector) throws IOException {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
return;
}

for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey key = i.next();
i.remove();
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 新客户端
SocketChannel channel = server.accept();
// 设置为非阻塞
channel.configureBlocking(false);
// 获取一个worker
Worker nextworker = getSelectorRunnablePool().nextWorker();
// 注册新客户端接入任务
nextworker.registerNewChannelTask(channel);

System.out.println("新客户端链接");
}
}

public void registerAcceptChannelTask(final ServerSocketChannel serverChannel){
final Selector selector = this.selector;
registerTask(new Runnable() {
@Override
public void run() {
try {
//注册serverChannel到selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
});
}

@Override
protected int select(Selector selector) throws IOException {
return selector.select();
}

}
package com.cn;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;

import com.cn.pool.NioSelectorRunnablePool;
import com.cn.pool.Worker;
/**
* worker实现类
*
*/
public class NioServerWorker extends AbstractNioSelector implements Worker{

public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
super(executor, threadName, selectorRunnablePool);
}

@Override
protected void process(Selector selector) throws IOException {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 移除,防止重复处理
ite.remove();

// 得到事件发生的Socket通道
SocketChannel channel = (SocketChannel) key.channel();

// 数据总长度
int ret = 0;
boolean failure = true;
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取数据
try {
ret = channel.read(buffer);
failure = false;
} catch (Exception e) {
// ignore
}
//判断是否连接已断开
if (ret <= 0 || failure) {
key.cancel();
System.out.println("客户端断开连接");
}else{
System.out.println("收到数据:" + new String(buffer.array()));

//回写数据
ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes());
channel.write(outBuffer);// 将消息回送给客户端
}
}
}

/**
* 加入一个新的socket客户端
*/
public void registerNewChannelTask(final SocketChannel channel){
final Selector selector = this.selector;
registerTask(new Runnable() {
@Override
public void run() {
try {
//将客户端注册到selector中
channel.register(selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
});
}

@Override
protected int select(Selector selector) throws IOException {
return selector.select(500);
}

}

上面的例子是直接引用jar,也可以通过引用项目netty的源码从而理解netty工作原理

试想我们如何提高NIO的工作效率,一个NIO是不是只能有一个selector?当然不是,一个系统可以有多个selector

selector可以注册多个ServerSocketChannel

我们如何去看一个开源框架的代码

一断点(多线程的情况下可以设置断点的条件,指定打印某个线程)

二打印

三看调用栈

四搜索
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  netty