您的位置:首页 > 编程语言 > Java开发

Spring整合Dubbo,使用zookeeper作为注册中心,进行远程调用及负载均衡、自动失效转移(何志雄)

2015-04-01 09:39 761 查看
1、Mavn依赖:

<span style="white-space:pre">			</span><!-- dubbo begin -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>2.5.3</version>
<exclusions>
<exclusion>
<artifactId>spring</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
</exclusions>
<scope>runtime</scope>
</dependency>
<!-- dubbo end -->

<!-- zookeeper begin -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.4</version>
</dependency>
<!-- zookeeper end -->

2、定义服务端提供方和消费方的通信接口

IDispatch.java

package com.gaojiasoft.test.dubbo;

//服务接口,dubbo服务的提供方及消费方都需要使用
public interface IDispatch
{

/**
* 有返回值,此时异步调用是收不到返回值的,需要在调用方增加回调接口获取返回值。
* @param msg
* @return
*/
String dispatch(String msg);

/**
* 没有返回值的,比较适合异步调用
* @param msg
*/
void noReturn(String msg);

}


3、在服务提供方实现具体的服务(在注销代码中,笔者视图使用多线程来处理客户端的请求,事实证明Dubbo中多此一举,Dubbo是基于NIO的TCP通信,本身已经是线程池)

DispatchImpl.java

package com.gaojiasoft.test.dubbo.provider;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.gaojiasoft.test.dubbo.IDispatch;

public class DispatchImpl implements IDispatch {
public static final Logger log = LoggerFactory
.getLogger(DispatchImpl.class);

/**
*不需要使用多线程处理客户端请求,因为dubbo的服务端是NIO,内部含有多线程复用
*/
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(
50, 150, 30L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new BasicThreadFactory.Builder().daemon(true)
.namingPattern("Dubbo-Provider-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy());

@Override
public String dispatch(final String msg) {
//		String back = null;
//		Future<Object> future = executor.submit(new Callable<Object>() {
//			@Override
//			public Object call() throws Exception {
//
//				log.debug("Server1 get msg:"+msg);
//				Thread.sleep(1);
//				return "SERVER 1 retrun:" + msg;
//
//			}
//		});
//		try {
//			back = future.get().toString();
//		} catch (InterruptedException | ExecutionException e) {
//			e.printStackTrace();
//		}
//		return back;
log.debug("Server1 get msg:"+msg);
//Thread.sleep(1);
return "SERVER 1 retrun:" + msg;
}

@Override
public void noReturn(String msg) {

}
}


4、服务提供方的Spring配置

<?xml version="1.0" encoding="UTF-8"?>
<!-- dubbo服务提供方 -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> 
<!-- 提供方应用信息,用于计算依赖关系 -->
<dubbo:application name="Server-B" />
<!-- 使用zookeeper对外暴露服务 -->
<dubbo:registry protocol="zookeeper"
address="192.168.62.155:2181,192.168.62.153:2181,192.168.62.154:2181" />

<!-- 用dubbo协议在20880端口暴露服务 -->
<!-- 提供服务的线程池固定数量为100,iothreads配置CPU个数+1 -->
<dubbo:protocol name="dubbo" port="20880" dispatcher="message"
threadpool="fixed" threads="100" iothreads="3" />

<!-- 声明需要暴露的服务接口 及实现类-,版本号2.0.0,配置同样版本号的消费者调用同样版本号的生产者,executes="200"服务提供者线程池线程数,
loadbalance="leastactive"是负载服务端并发数量最小的,可选值random:随机,roundrobin:轮询,leastactive:最小并发 -->
<dubbo:service interface="com.gaojiasoft.test.dubbo.IDispatch"
ref="disPatchImpl" version="1.0.0" loadbalance="roundrobin" group="hs">
</dubbo:service>

<!-- 和本地bean一样实现服务 -->
<bean id="disPatchImpl" class="com.gaojiasoft.test.dubbo.provider.DispatchImpl" />
</beans>


关于配置文件的介绍:

1、<dubbo:protocol>中的dispatcher可选择

0. all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。
1. direct 所有消息都不派发到线程池,全部在IO线程上直接执行。
2. message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在IO线程上执行。
3. execution 只请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在IO线程上执行。
4. connection 在IO线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。

2、<dubbo:server>中的group适用于多个系统都有同样名称的服务,此时,通过分组加以区分,此时消费方配置同样名称分组的就会调用同样名称的消费方。

3、<dubbo:server>中的version使用一个服务有多个版本,此时服务端提供多,可以提供多个名称同样的方法,使用不通的版本号。例如:

<dubbo:service interface="com.gaojiasoft.test.dubbo.IDispatch"
ref="<strong>disPatchImpl</strong>"<strong> version="1.0.0"</strong> loadbalance="roundrobin" group="hs">
</dubbo:service>
<dubbo:service interface="com.gaojiasoft.test.dubbo.IDispatch"
ref="<strong>disPatchImpl</strong>" <strong>version="2.0.0"</strong> loadbalance="roundrobin" group="hs">
</dubbo:service>
可以看到,服务名称一致,版本号不一直的服务被同时暴露,此时客户端按照自己的需求调用不同的版本号。就使得整个平台的可扩展性大大加强,非常使用整个平台的向下兼容的能力。

5、服务提供方的启动类
DubboTest.java

package com.gaojiasoft.test.dubbo;

import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class DubboTest {

private static ConfigurableApplicationContext contextProvider;

private static ConfigurableApplicationContext contextConsumer;

private Logger logger = LoggerFactory.getLogger("DubboTest");

private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(
50, 150, 30L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new BasicThreadFactory.Builder().daemon(true)
.namingPattern("Dubbo-Consumer-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy());

@Test
public void test() throws IOException, InterruptedException {

// String []list = new
// String[]{"classpath:conf/dubbo/spring-dubbo-provider.xml","classpath:conf/dubbo/spring-dubbo-consumer.xml"};
// contextProvider = new ClassPathXmlApplicationContext(list,true,null);

contextProvider = new ClassPathXmlApplicationContext(
"classpath:conf/dubbo/spring-dubbo-provider.xml");

while (true) {
Thread.sleep(1);
}

//		contextConsumer = new ClassPathXmlApplicationContext(
//				"classpath:conf/dubbo/spring-dubbo-consumer.xml");
//
//		IDispatch service = (IDispatch) contextConsumer.getBean("disPatcher");
//
//		int i = 1;
//		while (true) {
//			try {
//				// 多线程消费
//				mulitThreadConsumer(service, "" + i);
//			} catch (Exception e) {
//				logger.error("exception,dubbo provider error!");
//			}
//			Thread.sleep(1000);
//			i++;
//		}

}

/**
* 多线程消费者
*
* @param keyvalue
*/
private void mulitThreadConsumer(final IDispatch service,
final String message) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
String revice = service.dispatch(message);
logger.debug("通过dubbo调用服务,返回的消息是:" + revice);
Thread.sleep(1);

} catch (Throwable th) {
// 防御性容错,避免高并发下的一些问题
logger.error("", th);
}
}
});
}
}


6、服务端消费方的Spring配置

<?xml version="1.0" encoding="UTF-8"?>
<!-- dubbo服务消费方 -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd"> 
<!-- 消费方应用信息,用于计算依赖关系,与提供方不要一样 -->
<dubbo:application name="Server-A" />

<!-- 使用zookeeper注册中心暴露服务地址 -->
<dubbo:registry protocol="zookeeper" address="192.168.62.155:2181,192.168.62.153:2181,192.168.62.154:2181" />

<!-- 生成远程服务代理,与服务提供者建立3个TCP连接,并采用异步调用(非阻塞) -->
<dubbo:reference id="disPatcher" connections="3"
interface="com.gaojiasoft.test.dubbo.IDispatch" version="1.0.0" sent="false"
async="true" group="hs">
<!-- active是消费方请求线程池线程数,可针对不同的方法声明不同的线程数-->
<dubbo:method name="dispatch" return="false" actives="50"></dubbo:method>
</dubbo:reference>
</beans>


7、服务消费方的启动类
DubboConsumerStart.java

package com.study.dubbo.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.gaojiasoft.test.dubbo.IDispatch;

public class DubboConsumerStart
{
public static final Logger log = LoggerFactory.getLogger(DubboConsumerStart.class);

public static void main(String[] args) throws Exception
{
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("conf/spring-dubbo-consumer.xml");
context.start();

IDispatch service = (IDispatch) context.getBean("dispatcher");
int i =1;
while(true)
{
try
{
String revice = service.dispatch(""+i);
log.debug("通过dubbo调用服务,返回的消息是:" + revice);
Thread.sleep(1);
i++;
}catch(Exception e)
{
log.debug("调用异常:" + e.getClass().getName());
}
}

// System.in.read(); //为保证服务一直开着,利用输入流的阻塞来模拟
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐