您的位置:首页 > 其它

dubbo入门

2016-04-03 01:42 288 查看
dubbo是阿里巴巴开源的单一长连接服务框架,底层通信采用nio框架,支持netty,mina,grizzly,默认是netty。对dubbo比较感兴趣的是:

1. client端的线程模型是什么样的?

传统的io client是请求应答模式,发送请求-->等待远程应答。dubbo底层是异步IO的,所有请求复用单一长连接,所以调用都不会阻在IO上,而是阻在Future超时wait上。

2. server端的线程模型是什么样的?

这个比较成熟了,现在一般的server都是基于nio,一批io thread负责处理io,一批worker thread负责处理业务。

 

 一. 快速启动

学习dubbo最好的方式是快速运行起来,由于dubbo还是比较重量级的产品,之前遇到一些问题。

server端:

 

Java代码  


import java.io.IOException;  

  

import com.alibaba.dubbo.config.ApplicationConfig;  

import com.alibaba.dubbo.config.ProtocolConfig;  

import com.alibaba.dubbo.config.ServiceConfig;  

import com.duitang.dboss.client.test.BlogQueryService;  

import com.duitang.dboss.client.test.BlogQueryServiceImpl;  

  

public class DubboServerTester {  

  

    public static void main(String[] args) throws IOException {  

        BlogQueryService blogQueryService = new BlogQueryServiceImpl();  

        ApplicationConfig application = new ApplicationConfig();  

        application.setName("dubbo-test");  

  

        ProtocolConfig protocol = new ProtocolConfig();  

        protocol.setName("dubbo");  

        protocol.setPort(8989);  

        protocol.setThreads(200);  

  

        // RegistryConfig registry = new RegistryConfig();  

        // registry.setAddress("10.20.130.230:9090");  

        // registry.setUsername("aaa");  

        // registry.setPassword("bbb");  

  

        ServiceConfig<BlogQueryService> service = new ServiceConfig<BlogQueryService>(); // 此实例很重,封装了与注册中心的连接,请自行缓存,否则可能造成内存和连接泄漏  

        service.setApplication(application);  

  

        // service.setRegistry(registry);  

        service.setRegister(false);  

        service.setProtocol(protocol); // 多个协议可以用setProtocols()  

        service.setInterface(BlogQueryService.class);  

        service.setRef(blogQueryService);  

        service.setVersion("1.0.0");  

        // 暴露及注册服务  

        service.export();  

          

        System.out.println("Press any key to exit.");  

        System.in.read();  

    }  

}  

 注意:dubbo export服务默认依赖于RegistryConfig,如果没有配置RegistryConfig会报错.可以通过service.setRegister(false)禁用。

 

client:

 

Java代码  


import java.io.IOException;  

import java.util.ArrayList;  

import java.util.List;  

import java.util.concurrent.Callable;  

import java.util.concurrent.ExecutionException;  

import java.util.concurrent.ExecutorService;  

import java.util.concurrent.Executors;  

import java.util.concurrent.Future;  

import java.util.concurrent.ThreadFactory;  

import java.util.concurrent.atomic.AtomicInteger;  

  

import com.alibaba.dubbo.config.ApplicationConfig;  

import com.alibaba.dubbo.config.ReferenceConfig;  

import com.duitang.dboss.client.test.BlogQueryService;  

  

public class DubboClientTester {  

  

    public static void main(String[] args) throws InterruptedException, IOException {  

        ApplicationConfig application = new ApplicationConfig();  

        application.setName("dubbo-test");  

  

        ReferenceConfig<BlogQueryService> reference = new ReferenceConfig<BlogQueryService>();  

        reference.setUrl("dubbo://127.0.0.1:8989/com.duitang.dboss.client.test.BlogQueryService");  

        reference.setTimeout(500);  

        reference.setConnections(10);  

        reference.setApplication(application);  

        reference.setInterface(BlogQueryService.class);  

        reference.setVersion("1.0.0");  

        final BlogQueryService blogQueryService = reference.get();  

  

        long begin = System.currentTimeMillis();  

        System.out.println(blogQueryService.test());  

        long end = System.currentTimeMillis();  

        System.out.println(" cost:" + (end - begin));  

  

        ExecutorService es = Executors.newFixedThreadPool(50, new NamedThreadFactory("my test"));  

        List<Callable<String>> tasks = new ArrayList<Callable<String>>();  

        for (int i = 0; i < 100000; ++i) {  

            tasks.add(new Callable<String>() {  

  

                @Override  

                public String call() throws Exception {  

                    System.out.println("run");  

                    System.out.println(blogQueryService.test());  

                    System.out.println("run success");  

                    return null;  

                }  

            });  

        }  

        List<Future<String>> futurelist = es.invokeAll(tasks);  

        for (Future<String> future : futurelist) {  

            try {  

                String result = future.get();  

            } catch (ExecutionException e) {  

                e.printStackTrace();  

            }  

            System.out.println("------------------------------------------------------------------------------------------------------------------------------------------------\r\n");  

        }  

        es.shutdown();  

        System.out.println("end");  

        System.in.read();  

    }  

  

    static class NamedThreadFactory implements ThreadFactory {  

  

        private static final AtomicInteger POOL_SEQ   = new AtomicInteger(1);  

  

        private final AtomicInteger        mThreadNum = new AtomicInteger(1);  

  

        private final String               mPrefix;  

  

        private final boolean              mDaemo;  

  

        private final ThreadGroup          mGroup;  

  

        public NamedThreadFactory(){  

            this("pool-" + POOL_SEQ.getAndIncrement(), false);  

        }  

  

        public NamedThreadFactory(String prefix){  

            this(prefix, false);  

        }  

  

        public NamedThreadFactory(String prefix, boolean daemo){  

            mPrefix = prefix + "-thread-";  

            mDaemo = daemo;  

            SecurityManager s = System.getSecurityManager();  

            mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();  

        }  

  

        public Thread newThread(Runnable runnable) {  

            String name = mPrefix + mThreadNum.getAndIncrement();  

            Thread ret = new Thread(mGroup, runnable, name, 0);  

            ret.setDaemon(mDaemo);  

            return ret;  

        }  

  

        public ThreadGroup getThreadGroup() {  

            return mGroup;  

        }  

  

    }  

}  

 

 

1. 通过setUrl("")来实现远程服务直连。

2. 需要注意的是默认connection只有一个,可以通过setConnections()来指定connection pool。在高负载环境下,nio的单连接也会遇到瓶颈,此时你可以通过设置连接池来让更多的连接分担dubbo的请求负载,从而提高系统的吞吐量。”

二. 代码流程

这里重点分析一下client的调用过程,client调用分为三个部分:

1). 初始化,建立连接。

2). 发送请求。

3). 等待远程应答。

(一).初始化

1. DubboProtocol.initClient() 

2. Exchangers.connect(URL url, ExchangeHandler handler)    

3. Exchangers.getExchanger(url).connect(url, handler)

4. HeaderExchanger.connect(URL url, ExchangeHandler handler)

5. return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));

6. Transporters.getTransporter().connect(URL url, ChannelHandler handler)

7. NettyTransporter.connect(URL url, ChannelHandler listener)

8. new NettyClient(url, listener) //timeout默认值:timeout=1000;connectTimeout=3000; 

9. NettyClient.doOpen()        //创建netty的ClientBootstrap

bootstrap = new ClientBootstrap(channelFactory);

bootstrap.setOption("keepAlive", true);

bootstrap.setOption("tcpNoDelay", true);

bootstrap.setOption("connectTimeoutMillis", getTimeout()); //注意:此timeout是timeout,而非connectTimeout

10. AbstractClient.connect()

11. NettyClient.doConnect()  //如果远程地址无法连接,抛出timeout异常流程结束。

ChannelFuture future = bootstrap.connect(getConnectAddress());

boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);

(二).发送请求

1.DubboInvoker.doInvoke(Invocation invocation) //currentClient.request(invocation, timeout).get()

2.HeaderExchangeClient.request(invocation, timeout)

3.HeaderExchangeChannel.request(Invocation invocation,timeout)

4.AbstractPeer.send(Request request) 

5.NettyChannel.send(Object message, boolean sent)

6.NioClientSocketChannel.write(message)

7.NettyHandler.writeRequested(ChannelHandlerContext ctx, MessageEvent e)

8.AbstractPeer.sent(Channel ch, Request request)

(三).等待远程应答

在调用DubboInvoker.doInvoke(Invocation invocation)中实际是调用currentClient.request(invocation, timeout).get(),此方法会返回DefaultFuture,调用get方法会阻塞直到超时,在阻塞的同时netty的io线程会接收到远程应答,如果收到响应会产生io事件调用NettyHandler.messageReceived。

1.NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)

2.AbstractPeer.received(Channel ch, Object msg)

3.MultiMessageHandler.received(Channel channel, Object message)  

4.AllChannelHandler.received(Channel channel, Object message)

5.DecodeHandler.received(Channel channel, Object message)

6.HeaderExchangeHandler.received(Channel channel, Object message) 

7.DefaultFuture.received(Channel channel, Response response)  //注意是static方法

DefaultFuture future = FUTURES.remove(response.getId());

if (future != null) {

    future.doReceived(response);

}

三. dubbo client的核心

我认为dubbo client的核心在DefaultFuture。所以远程调用都不会阻在IO上,而是阻在Future超时wait上,下面忽略掉远程调用把future抽取出来。



 

下面是代码实现

Java代码  


package executor;  

  

import java.util.concurrent.Callable;  

import java.util.concurrent.ExecutionException;  

import java.util.concurrent.ExecutorService;  

import java.util.concurrent.Executors;  

import java.util.concurrent.Future;  

import java.util.concurrent.atomic.AtomicLong;  

  

public class Commands {  

  

    private ExecutorService senders   = Executors.newCachedThreadPool();  

    private ExecutorService receviers = Executors.newCachedThreadPool();  

    private AtomicLong      counter   = new AtomicLong();  

  

    public CommandResponse execute(Callable<Object> task, int timeout) {  

        Future<Object> result = senders.submit(task);  

        long id = counter.getAndIncrement();  

        CommandFuture commandFuture = new CommandFuture(id);  

        receviers.submit(new ReceiveWorker(id, result));  

        return commandFuture.get(timeout);  

    }  

  

    static class ReceiveWorker implements Runnable {  

  

        private Future<Object> result;  

        private Long           id;  

  

        public ReceiveWorker(Long id, Future<Object> result){  

            super();  

            this.result = result;  

            this.id = id;  

        }  

  

        @Override  

        public void run() {  

            try {  

                Object obj = result.get();  

                CommandFuture.received(new CommandResponse(id, obj));  

            } catch (InterruptedException e) {  

                e.printStackTrace();  

            } catch (ExecutionException e) {  

                e.printStackTrace();  

            }  

        }  

    }  

  

    public void shutdown() {  

        senders.shutdown();  

        receviers.shutdown();  

    }  

}  

 

Java代码  


package executor;  

  

import java.util.Map;  

import java.util.concurrent.ConcurrentHashMap;  

import java.util.concurrent.TimeUnit;  

import java.util.concurrent.locks.Condition;  

import java.util.concurrent.locks.Lock;  

import java.util.concurrent.locks.ReentrantLock;  

  

public class CommandFuture {  

  

    private final Lock                            lock    = new ReentrantLock();  

  

    private final Condition                       done    = lock.newCondition();  

  

    private CommandResponse                                response;  

  

    private static final Map<Long, CommandFuture> FUTURES = new ConcurrentHashMap<Long, CommandFuture>();  

  

      

    public CommandFuture(Long id){  

        FUTURES.put(id, this);  

    }  

  

    public boolean isDone() {  

        return response != null;  

    }  

  

    public CommandResponse get(int timeout) {  

  

        if (!isDone()) {  

            long start = System.currentTimeMillis();  

            lock.lock();  

            try {  

                while (!isDone()) {  

                    done.await(timeout, TimeUnit.MILLISECONDS);  

                    if (isDone() || System.currentTimeMillis() - start >= timeout) {  

                        break;  

                    }  

                }  

            } catch (InterruptedException e) {  

                throw new RuntimeException(e);  

            } finally {  

                lock.unlock();  

            }  

            if (!isDone()) {  

                throw new TimeoutException("timeout");  

            }  

        }  

        return response;  

    }  

  

    public void doReceived(CommandResponse response) {  

        lock.lock();  

        try {  

            this.response = response;  

            if (done != null) {  

                done.signal();  

            }  

        } finally {  

            lock.unlock();  

        }  

  

    }  

  

    public static void received(CommandResponse response) {  

        try {  

            CommandFuture future = FUTURES.remove(response.getId());  

            if (future != null) {  

                future.doReceived(response);  

            } else {  

                System.out.println("some error!");  

            }  

        } finally {  

            // CHANNELS.remove(response.getId());  

        }  

    }  

}  

 

Java代码  


package executor;  

  

import java.util.concurrent.Callable;  

import java.util.concurrent.ExecutionException;  

import java.util.concurrent.ExecutorService;  

import java.util.concurrent.Executors;  

import java.util.concurrent.Future;  

import java.util.concurrent.atomic.AtomicLong;  

  

public class Commands {  

  

    private ExecutorService senders   = Executors.newCachedThreadPool();  

    private ExecutorService receviers = Executors.newCachedThreadPool();  

    private AtomicLong      counter   = new AtomicLong();  

  

    public CommandResponse execute(Callable<Object> task, int timeout) {  

        Future<Object> result = senders.submit(task);  

        long id = counter.getAndIncrement();  

        CommandFuture commandFuture = new CommandFuture(id);  

        receviers.submit(new ReceiveWorker(id, result));  

        return commandFuture.get(timeout);  

    }  

  

    static class ReceiveWorker implements Runnable {  

  

        private Future<Object> result;  

        private Long           id;  

  

        public ReceiveWorker(Long id, Future<Object> result){  

            super();  

            this.result = result;  

            this.id = id;  

        }  

  

        @Override  

        public void run() {  

            try {  

                Object obj = result.get();  

                CommandFuture.received(new CommandResponse(id, obj));  

            } catch (InterruptedException e) {  

                e.printStackTrace();  

            } catch (ExecutionException e) {  

                e.printStackTrace();  

            }  

        }  

    }  

  

    public void shutdown() {  

        senders.shutdown();  

        receviers.shutdown();  

    }  

}  

 

下面是jstack



查看图片附件
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: