您的位置:首页 > 编程语言 > Java开发

[转载] Thrift-server与spring集成

2015-10-28 23:49 190 查看
转载自http://shift-alt-ctrl.iteye.com/blog/1990026

Thrift服务server端,其实就是一个ServerSocket线程 + 处理器,当Thrift-client端建立链接之后,处理器负责解析socket流信息,并根据其指定的"方法名"+参数列表,来调用"服务实现类"的方法,并将执行结果(或者异常)写入到socket中.

一个server,就需要创建一个ServerSocket,并侦听本地的一个端口,这种情况对分布式部署,有一些额外的要求:client端需要知道一个"service"被部署在了那些server上.

设计思路:

1) 每个server内部采用threadPool的方式,来提升并发能力.

2) 当server启动成功后,向zookeeper注册服务节点,此后client端就可以"感知到"服务的状态

3) 通过spring的方式,配置thrift-server服务类.

其中zookeepeer注册是可选选项

1.pom.xml

Java代码


<dependencies>

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-context</artifactId>

<version>3.0.7.RELEASE</version>

</dependency>

<dependency>

<groupId>org.apache.zookeeper</groupId>

<artifactId>zookeeper</artifactId>

<version>3.4.5</version>

<!--<exclusions>-->

<!--<exclusion>-->

<!--<groupId>log4j</groupId>-->

<!--<artifactId>log4j</artifactId>-->

<!--</exclusion>-->

<!--</exclusions>-->

</dependency>

<!--

<dependency>

<groupId>com.101tec</groupId>

<artifactId>zkclient</artifactId>

<version>0.4</version>

</dependency>

-->

<dependency>

<groupId>org.apache.thrift</groupId>

<artifactId>libthrift</artifactId>

<version>0.9.1</version>

</dependency>

<dependency>

<groupId>org.apache.curator</groupId>

<artifactId>curator-recipes</artifactId>

<version>2.3.0</version>

</dependency>

<dependency>

<groupId>commons-pool</groupId>

<artifactId>commons-pool</artifactId>

<version>1.6</version>

</dependency>

</dependencies>

本实例,使用了apache-curator作为zookeeper客户端.

2. spring-thrift-server.xml

Java代码


<!-- zookeeper -->

<bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close">

<property name="connectString" value="127.0.0.1:2181"></property>

<property name="namespace" value="demo/thrift-service"></property>

</bean>

<bean id="sericeAddressReporter" class="com.demo.thrift.support.impl.DynamicAddressReporter" destroy-method="close">

<property name="zookeeper" ref="thriftZookeeper"></property>

</bean>

<bean id="userService" class="com.demo.service.UserServiceImpl"/>

<bean class="com.demo.thrift.ThriftServiceServerFactory" destroy-method="close">

<property name="service" ref="userService"></property>

<property name="configPath" value="UserServiceImpl"></property>

<property name="port" value="9090"></property>

<property name="addressReporter" ref="sericeAddressReporter"></property>

</bean>

3. ThriftServiceServerFactory.java

此类严格上说并不是一个工厂类,它的主要作用就是封装指定的"service" ,然后启动一个server的过程,其中"service"属性表示服务的实现类,addressReporter表示当server启动成功后,需要指定的操作(比如,向zookeeper发送service的IP信息).

究竟当前server的ip地址是多少,在不同的设计中,有所不同,比如:有些管理员喜欢将本机的IP地址写入到os下的某个文件中,如果上层应用需要获取可靠的IP信息,就需要读取这个文件...你可以实现自己的ThriftServerIpTransfer来获取当前server的IP.

为了减少xml中的配置信息,在factory中,使用了反射机制来构建"Processor"类.

Java代码


public class ThriftServiceServerFactory implements InitializingBean {

private Integer port;

private Integer priority = 1;// default

private Object service;// serice实现类

private ThriftServerIpTransfer ipTransfer;

private ThriftServerAddressReporter addressReporter;

private ServerThread serverThread;

private String configPath;

public void setService(Object service) {

this.service = service;

}

public void setPriority(Integer priority) {

this.priority = priority;

}

public void setPort(Integer port) {

this.port = port;

}

public void setIpTransfer(ThriftServerIpTransfer ipTransfer) {

this.ipTransfer = ipTransfer;

}

public void setAddressReporter(ThriftServerAddressReporter addressReporter) {

this.addressReporter = addressReporter;

}

public void setConfigPath(String configPath) {

this.configPath = configPath;

}

@Override

public void afterPropertiesSet() throws Exception {

if (ipTransfer == null) {

ipTransfer = new LocalNetworkIpTransfer();

}

String ip = ipTransfer.getIp();

if (ip == null) {

throw new NullPointerException("cant find server ip...");

}

String hostname = ip + ":" + port + ":" + priority;

Class serviceClass = service.getClass();

ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

Class<?>[] interfaces = serviceClass.getInterfaces();

if (interfaces.length == 0) {

throw new IllegalClassFormatException("service-class should implements Iface");

}

// reflect,load "Processor";

Processor processor = null;

for (Class clazz : interfaces) {

String cname = clazz.getSimpleName();

if (!cname.equals("Iface")) {

continue;

}

String pname = clazz.getEnclosingClass().getName() + "$Processor";

try {

Class pclass = classLoader.loadClass(pname);

if (!pclass.isAssignableFrom(Processor.class)) {

continue;

}

Constructor constructor = pclass.getConstructor(clazz);

processor = (Processor) constructor.newInstance(service);

break;

} catch (Exception e) {

//

}

}

if (processor == null) {

throw new IllegalClassFormatException("service-class should implements Iface");

}

//需要单独的线程,因为serve方法是阻塞的.

serverThread = new ServerThread(processor, port);

serverThread.start();

// report

if (addressReporter != null) {

addressReporter.report(configPath, hostname);

}

}

class ServerThread extends Thread {

private TServer server;

ServerThread(Processor processor, int port) throws Exception {

TServerSocket serverTransport = new TServerSocket(port);

Factory portFactory = new TBinaryProtocol.Factory(true, true);

Args args = new Args(serverTransport);

args.processor(processor);

args.protocolFactory(portFactory);

server = new TThreadPoolServer(args);

}

@Override

public void run(){

try{

server.serve();

}catch(Exception e){

//

}

}

public void stopServer(){

server.stop();

}

}

public void close() {

serverThread.stopServer();

}

}

4. DynamicAddressReporter.java

在ThriftServiceServerFactory中,有个可选的属性:addressReporter, DynamicAddressReporter提供了向zookeeper注册service信息的能力,当server启动正常后,把server的IP + port发送到zookeeper中;那么此后服务消费client,就可以从zookeeper中获取server列表,并与它们建立链接(池).这样client端只需要关注zookeeper的节点名称即可,不需要配置大量的ip+port.

Java代码


public class DynamicAddressReporter implements ThriftServerAddressReporter {

private CuratorFramework zookeeper;

public DynamicAddressReporter(){}

public DynamicAddressReporter(CuratorFramework zookeeper){

this.zookeeper = zookeeper;

}

public void setZookeeper(CuratorFramework zookeeper) {

this.zookeeper = zookeeper;

}

@Override

public void report(String service, String address) throws Exception {

if(zookeeper.getState() == CuratorFrameworkState.LATENT){

zookeeper.start();

zookeeper.newNamespaceAwareEnsurePath(service);

}

zookeeper.create()

.creatingParentsIfNeeded()

.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)

.forPath(service +"/i_",address.getBytes("utf-8"));

}

public void close(){

zookeeper.close();

}

}

5. 测试类

Java代码


public class ServiceMain {

/**

* @param args

*/

public static void main(String[] args) {

try {

ApplicationContext context = new ClassPathXmlApplicationContext("spring-thrift-server.xml");

Thread.sleep(3000000);

} catch (Exception e) {

e.printStackTrace();

}

}

}

本文就不在展示如何使用thrift文件生成service API的过程,请参考[Thrift简介]

Thrift-client端代码开发与配合,请参见[Thrift-client]

更多代码,请参考附件.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: