[转载] Thrift-server与spring集成
2015-10-28 23:49
190 查看
转载自http://shift-alt-ctrl.iteye.com/blog/1990026
Thrift服务server端,其实就是一个ServerSocket线程 + 处理器,当Thrift-client端建立链接之后,处理器负责解析socket流信息,并根据其指定的"方法名"+参数列表,来调用"服务实现类"的方法,并将执行结果(或者异常)写入到socket中.
一个server,就需要创建一个ServerSocket,并侦听本地的一个端口,这种情况对分布式部署,有一些额外的要求:client端需要知道一个"service"被部署在了那些server上.
设计思路:
1) 每个server内部采用threadPool的方式,来提升并发能力.
2) 当server启动成功后,向zookeeper注册服务节点,此后client端就可以"感知到"服务的状态
3) 通过spring的方式,配置thrift-server服务类.
其中zookeepeer注册是可选选项
1.pom.xml
Java代码
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>3.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5</version>
<!--<exclusions>-->
<!--<exclusion>-->
<!--<groupId>log4j</groupId>-->
<!--<artifactId>log4j</artifactId>-->
<!--</exclusion>-->
<!--</exclusions>-->
</dependency>
<!--
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.4</version>
</dependency>
-->
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>
</dependencies>
本实例,使用了apache-curator作为zookeeper客户端.
2. spring-thrift-server.xml
Java代码
<!-- zookeeper -->
<bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close">
<property name="connectString" value="127.0.0.1:2181"></property>
<property name="namespace" value="demo/thrift-service"></property>
</bean>
<bean id="sericeAddressReporter" class="com.demo.thrift.support.impl.DynamicAddressReporter" destroy-method="close">
<property name="zookeeper" ref="thriftZookeeper"></property>
</bean>
<bean id="userService" class="com.demo.service.UserServiceImpl"/>
<bean class="com.demo.thrift.ThriftServiceServerFactory" destroy-method="close">
<property name="service" ref="userService"></property>
<property name="configPath" value="UserServiceImpl"></property>
<property name="port" value="9090"></property>
<property name="addressReporter" ref="sericeAddressReporter"></property>
</bean>
3. ThriftServiceServerFactory.java
此类严格上说并不是一个工厂类,它的主要作用就是封装指定的"service" ,然后启动一个server的过程,其中"service"属性表示服务的实现类,addressReporter表示当server启动成功后,需要指定的操作(比如,向zookeeper发送service的IP信息).
究竟当前server的ip地址是多少,在不同的设计中,有所不同,比如:有些管理员喜欢将本机的IP地址写入到os下的某个文件中,如果上层应用需要获取可靠的IP信息,就需要读取这个文件...你可以实现自己的ThriftServerIpTransfer来获取当前server的IP.
为了减少xml中的配置信息,在factory中,使用了反射机制来构建"Processor"类.
Java代码
public class ThriftServiceServerFactory implements InitializingBean {
private Integer port;
private Integer priority = 1;// default
private Object service;// serice实现类
private ThriftServerIpTransfer ipTransfer;
private ThriftServerAddressReporter addressReporter;
private ServerThread serverThread;
private String configPath;
public void setService(Object service) {
this.service = service;
}
public void setPriority(Integer priority) {
this.priority = priority;
}
public void setPort(Integer port) {
this.port = port;
}
public void setIpTransfer(ThriftServerIpTransfer ipTransfer) {
this.ipTransfer = ipTransfer;
}
public void setAddressReporter(ThriftServerAddressReporter addressReporter) {
this.addressReporter = addressReporter;
}
public void setConfigPath(String configPath) {
this.configPath = configPath;
}
@Override
public void afterPropertiesSet() throws Exception {
if (ipTransfer == null) {
ipTransfer = new LocalNetworkIpTransfer();
}
String ip = ipTransfer.getIp();
if (ip == null) {
throw new NullPointerException("cant find server ip...");
}
String hostname = ip + ":" + port + ":" + priority;
Class serviceClass = service.getClass();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<?>[] interfaces = serviceClass.getInterfaces();
if (interfaces.length == 0) {
throw new IllegalClassFormatException("service-class should implements Iface");
}
// reflect,load "Processor";
Processor processor = null;
for (Class clazz : interfaces) {
String cname = clazz.getSimpleName();
if (!cname.equals("Iface")) {
continue;
}
String pname = clazz.getEnclosingClass().getName() + "$Processor";
try {
Class pclass = classLoader.loadClass(pname);
if (!pclass.isAssignableFrom(Processor.class)) {
continue;
}
Constructor constructor = pclass.getConstructor(clazz);
processor = (Processor) constructor.newInstance(service);
break;
} catch (Exception e) {
//
}
}
if (processor == null) {
throw new IllegalClassFormatException("service-class should implements Iface");
}
//需要单独的线程,因为serve方法是阻塞的.
serverThread = new ServerThread(processor, port);
serverThread.start();
// report
if (addressReporter != null) {
addressReporter.report(configPath, hostname);
}
}
class ServerThread extends Thread {
private TServer server;
ServerThread(Processor processor, int port) throws Exception {
TServerSocket serverTransport = new TServerSocket(port);
Factory portFactory = new TBinaryProtocol.Factory(true, true);
Args args = new Args(serverTransport);
args.processor(processor);
args.protocolFactory(portFactory);
server = new TThreadPoolServer(args);
}
@Override
public void run(){
try{
server.serve();
}catch(Exception e){
//
}
}
public void stopServer(){
server.stop();
}
}
public void close() {
serverThread.stopServer();
}
}
4. DynamicAddressReporter.java
在ThriftServiceServerFactory中,有个可选的属性:addressReporter, DynamicAddressReporter提供了向zookeeper注册service信息的能力,当server启动正常后,把server的IP + port发送到zookeeper中;那么此后服务消费client,就可以从zookeeper中获取server列表,并与它们建立链接(池).这样client端只需要关注zookeeper的节点名称即可,不需要配置大量的ip+port.
Java代码
public class DynamicAddressReporter implements ThriftServerAddressReporter {
private CuratorFramework zookeeper;
public DynamicAddressReporter(){}
public DynamicAddressReporter(CuratorFramework zookeeper){
this.zookeeper = zookeeper;
}
public void setZookeeper(CuratorFramework zookeeper) {
this.zookeeper = zookeeper;
}
@Override
public void report(String service, String address) throws Exception {
if(zookeeper.getState() == CuratorFrameworkState.LATENT){
zookeeper.start();
zookeeper.newNamespaceAwareEnsurePath(service);
}
zookeeper.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(service +"/i_",address.getBytes("utf-8"));
}
public void close(){
zookeeper.close();
}
}
5. 测试类
Java代码
public class ServiceMain {
/**
* @param args
*/
public static void main(String[] args) {
try {
ApplicationContext context = new ClassPathXmlApplicationContext("spring-thrift-server.xml");
Thread.sleep(3000000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
本文就不在展示如何使用thrift文件生成service API的过程,请参考[Thrift简介]
Thrift-client端代码开发与配合,请参见[Thrift-client]
更多代码,请参考附件.
Thrift服务server端,其实就是一个ServerSocket线程 + 处理器,当Thrift-client端建立链接之后,处理器负责解析socket流信息,并根据其指定的"方法名"+参数列表,来调用"服务实现类"的方法,并将执行结果(或者异常)写入到socket中.
一个server,就需要创建一个ServerSocket,并侦听本地的一个端口,这种情况对分布式部署,有一些额外的要求:client端需要知道一个"service"被部署在了那些server上.
设计思路:
1) 每个server内部采用threadPool的方式,来提升并发能力.
2) 当server启动成功后,向zookeeper注册服务节点,此后client端就可以"感知到"服务的状态
3) 通过spring的方式,配置thrift-server服务类.
其中zookeepeer注册是可选选项
1.pom.xml
Java代码
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>3.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5</version>
<!--<exclusions>-->
<!--<exclusion>-->
<!--<groupId>log4j</groupId>-->
<!--<artifactId>log4j</artifactId>-->
<!--</exclusion>-->
<!--</exclusions>-->
</dependency>
<!--
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.4</version>
</dependency>
-->
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>
</dependencies>
本实例,使用了apache-curator作为zookeeper客户端.
2. spring-thrift-server.xml
Java代码
<!-- zookeeper -->
<bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close">
<property name="connectString" value="127.0.0.1:2181"></property>
<property name="namespace" value="demo/thrift-service"></property>
</bean>
<bean id="sericeAddressReporter" class="com.demo.thrift.support.impl.DynamicAddressReporter" destroy-method="close">
<property name="zookeeper" ref="thriftZookeeper"></property>
</bean>
<bean id="userService" class="com.demo.service.UserServiceImpl"/>
<bean class="com.demo.thrift.ThriftServiceServerFactory" destroy-method="close">
<property name="service" ref="userService"></property>
<property name="configPath" value="UserServiceImpl"></property>
<property name="port" value="9090"></property>
<property name="addressReporter" ref="sericeAddressReporter"></property>
</bean>
3. ThriftServiceServerFactory.java
此类严格上说并不是一个工厂类,它的主要作用就是封装指定的"service" ,然后启动一个server的过程,其中"service"属性表示服务的实现类,addressReporter表示当server启动成功后,需要指定的操作(比如,向zookeeper发送service的IP信息).
究竟当前server的ip地址是多少,在不同的设计中,有所不同,比如:有些管理员喜欢将本机的IP地址写入到os下的某个文件中,如果上层应用需要获取可靠的IP信息,就需要读取这个文件...你可以实现自己的ThriftServerIpTransfer来获取当前server的IP.
为了减少xml中的配置信息,在factory中,使用了反射机制来构建"Processor"类.
Java代码
public class ThriftServiceServerFactory implements InitializingBean {
private Integer port;
private Integer priority = 1;// default
private Object service;// serice实现类
private ThriftServerIpTransfer ipTransfer;
private ThriftServerAddressReporter addressReporter;
private ServerThread serverThread;
private String configPath;
public void setService(Object service) {
this.service = service;
}
public void setPriority(Integer priority) {
this.priority = priority;
}
public void setPort(Integer port) {
this.port = port;
}
public void setIpTransfer(ThriftServerIpTransfer ipTransfer) {
this.ipTransfer = ipTransfer;
}
public void setAddressReporter(ThriftServerAddressReporter addressReporter) {
this.addressReporter = addressReporter;
}
public void setConfigPath(String configPath) {
this.configPath = configPath;
}
@Override
public void afterPropertiesSet() throws Exception {
if (ipTransfer == null) {
ipTransfer = new LocalNetworkIpTransfer();
}
String ip = ipTransfer.getIp();
if (ip == null) {
throw new NullPointerException("cant find server ip...");
}
String hostname = ip + ":" + port + ":" + priority;
Class serviceClass = service.getClass();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<?>[] interfaces = serviceClass.getInterfaces();
if (interfaces.length == 0) {
throw new IllegalClassFormatException("service-class should implements Iface");
}
// reflect,load "Processor";
Processor processor = null;
for (Class clazz : interfaces) {
String cname = clazz.getSimpleName();
if (!cname.equals("Iface")) {
continue;
}
String pname = clazz.getEnclosingClass().getName() + "$Processor";
try {
Class pclass = classLoader.loadClass(pname);
if (!pclass.isAssignableFrom(Processor.class)) {
continue;
}
Constructor constructor = pclass.getConstructor(clazz);
processor = (Processor) constructor.newInstance(service);
break;
} catch (Exception e) {
//
}
}
if (processor == null) {
throw new IllegalClassFormatException("service-class should implements Iface");
}
//需要单独的线程,因为serve方法是阻塞的.
serverThread = new ServerThread(processor, port);
serverThread.start();
// report
if (addressReporter != null) {
addressReporter.report(configPath, hostname);
}
}
class ServerThread extends Thread {
private TServer server;
ServerThread(Processor processor, int port) throws Exception {
TServerSocket serverTransport = new TServerSocket(port);
Factory portFactory = new TBinaryProtocol.Factory(true, true);
Args args = new Args(serverTransport);
args.processor(processor);
args.protocolFactory(portFactory);
server = new TThreadPoolServer(args);
}
@Override
public void run(){
try{
server.serve();
}catch(Exception e){
//
}
}
public void stopServer(){
server.stop();
}
}
public void close() {
serverThread.stopServer();
}
}
4. DynamicAddressReporter.java
在ThriftServiceServerFactory中,有个可选的属性:addressReporter, DynamicAddressReporter提供了向zookeeper注册service信息的能力,当server启动正常后,把server的IP + port发送到zookeeper中;那么此后服务消费client,就可以从zookeeper中获取server列表,并与它们建立链接(池).这样client端只需要关注zookeeper的节点名称即可,不需要配置大量的ip+port.
Java代码
public class DynamicAddressReporter implements ThriftServerAddressReporter {
private CuratorFramework zookeeper;
public DynamicAddressReporter(){}
public DynamicAddressReporter(CuratorFramework zookeeper){
this.zookeeper = zookeeper;
}
public void setZookeeper(CuratorFramework zookeeper) {
this.zookeeper = zookeeper;
}
@Override
public void report(String service, String address) throws Exception {
if(zookeeper.getState() == CuratorFrameworkState.LATENT){
zookeeper.start();
zookeeper.newNamespaceAwareEnsurePath(service);
}
zookeeper.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(service +"/i_",address.getBytes("utf-8"));
}
public void close(){
zookeeper.close();
}
}
5. 测试类
Java代码
public class ServiceMain {
/**
* @param args
*/
public static void main(String[] args) {
try {
ApplicationContext context = new ClassPathXmlApplicationContext("spring-thrift-server.xml");
Thread.sleep(3000000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
本文就不在展示如何使用thrift文件生成service API的过程,请参考[Thrift简介]
Thrift-client端代码开发与配合,请参见[Thrift-client]
更多代码,请参考附件.
相关文章推荐
- Java 多态机制专题
- [转载] Thrift-client与spring集成
- Spring官网改版后下载
- Java的向下转型和向上转型专题
- Java NIO (一)
- Java中比较容易混淆的知识点
- Java 开源工具包
- JProfiler入门使用教程:Eclipse集成
- 多线程结果集合并demo【实例demo练习】
- Spring 的IOC和AOP
- java通过JDBC链接SQLServer2012
- java 引用类型
- Eclipse Xml编译错误Referenced file contains errors - spring-beans-4.0.xsd
- 关于Eclipse中 Android sdk 不能更新解决方案
- Eclipse 常用快捷键 (动画讲解)
- java基础--初始化
- springMvc4+hibernate4+activiti5.15(Maven)
- spring boot 使用h2数据库配置(内存模式)
- 怎样在spring初始化完成后执行一些操作?
- List集合拆分