您的位置:首页 > 其它

微服务实现请求合并(Hystrix)

2019-01-26 23:02 561 查看

目录

1 请求合并架构

2 注解方式实现HystrixCommand

2.1 Consumer引入POM

2.2 编写请求合并代码

 2.3 Consumer启动类&配置

2.4 Provider服务详解

3 继承方式实现HystrixCommand

3.1 Consumer侧服务层支持批量请求

3.2 Consumer侧继承HystrixCommand

3.3 Consumer侧继承HystrixCollapser

3.4 Consumer侧Controller提供调用接口

3.5 测试

1 请求合并架构

Consumer服务为避免大并发请求Provider服务,造成Provider服务器压力过大,进而导致Prvoider服务雪崩,故在Consumer服务侧采取请求合并策略,将一定时间内的请求进行合并,一次访问Provider获取数据。如下图形象的描述了请求合并的机制:

2 注解方式实现HystrixCommand

Hystrix提供了两种(注解、继承)方式来实现请求合并,本章以注解的方式来优雅的实现请求合并功能。

2.1 Consumer引入POM

主要引入Hystrix的依赖

[code]                <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>

2.2 编写请求合并代码

在BookService类中为getBook方法编写一个请求合并的代码getBookList,根据HystrixProperty属性设置将5s内的请求进行合并处理,且每次最大请求数量为200。另外,需要Provider服务支持http://localhost:9092/batchgetbook?ids=1,2  类型的请求。

[code]
@Service
public class BookService {

@Autowired
private RestTemplate restTemplate;

@Autowired
private LoadBalancerClient loadBalancerClient;

/**
* 可异步
* */
@HystrixCollapser(batchMethod = "getBookList", scope=com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
collapserProperties={
//请求时间间隔在5s之内的请求会被合并为一个请求,为方便手动测试,将两个浏览器的请求进行合并,将时间设置得比较大,实际中
//需要设置得少,避免拉长每个请求的响应
@HystrixProperty(name="timerDelayInMilliseconds", value="5000"),
//在批处理中允许的最大请求数
@HystrixProperty(name="maxRequestsInBatch",value="200"),
})
public Future<Book> getBook(Integer id){
System.out.println("============"+id);
return null;
}

@HystrixCommand (commandProperties={
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",value = "4000")
})
public List<Book> getBookList(List<Integer> ids) {
System.out.println("====================get BookList===== " + ids);

ServiceInstance si=loadBalancerClient.choose("bookshop-service-provider");
StringBuffer sb=new StringBuffer("");
sb.append("http://");
sb.append(si.getHost());
sb.append(":");
sb.append(si.getPort());
sb.append("/batchgetbook");
sb.append("?ids="+StringUtils.join(ids,","));
System.out.println(sb.toString());

/**
*封装返回对象为List<Book>
**/
ParameterizedTypeReference<List<Book>> responseType = new ParameterizedTypeReference<List<Book>>(){};
ResponseEntity<List<Book>> result = restTemplate.exchange(sb.toString(),HttpMethod.GET, null,responseType);
System.out.println("xxxxx :" +result.getStatusCode());
return result.getBody();

}

}

参数解释:

请求合并参数
参数 作用 默认值 备注
@HystrixCollapser     被@HystrixCollapser标注的方法,返回类型必须为Future,使用异步方法,否则无法进行请求合并.
batchMethod 请求合并的方法   方法只能接受一个参数,如果需要传递多个参数,封装成一个类参数
scope 请求范围 REQUEST 请求方式:分为REQUEST,GLOBAL
REQUEST范围只对一个request请求内的多次服务请求进行合并。
GLOBAL是应用中的所有线程的请求中的多次服务请求进行合并.
timeDelayinMilliseconds 请求时间间隔在10ms之内的请求会被合并为一个请求 10ms 建议尽量设置得小一点,如果并发量不大,没必要使用HystrixCollapser来处理.
maxRequestsInBatch 设置触发批处理执行之前,在批处理中允许的最大请求数 Integer.MAX_VALUE  

 2.3 Consumer启动类&配置

在启动类中启动Hystrix熔断和EurekaClient,启动Eureka主要是访问Provider提供的服务。

[code]@SpringBootApplication
@EnableCircuitBreaker
@EnableEurekaClient

public class BookshopConsumerHystrixBatchApplication {

@Bean
public RestTemplate createRt() {
return new RestTemplate();
}

public static void main(String[] args) {
SpringApplication.run(BookshopConsumerHystrixBatchApplication.class, args);
}

}

2.4 Provider服务详解

Provider服务利用Feign和Eureka搭建出业务平台。此处简单的介绍其实现:由Controller API,Provider Service组成,Controller API设定每个请求的参数和请求方式以及Entity,Provider Service实现Controller API。(具体的实现架构可参见声明式服务调用的实现(Feign)及其优化(Ribbon)

Controller API主要代码(未包含Book Entity):本次Consumer主要调用的是List<Book> batchGetBook(@RequestParam("ids") String ids)方法。

[code]package bookshop.api.controller;

import java.util.List;

import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;

import bookshop.api.entity.Book;

public interface BookController {

@GetMapping(value="/getbooklist")
public List<Book> getBookList();

@RequestMapping(value="/get",method=RequestMethod.GET)
public Book getBook(@RequestParam("id") Integer id);

/**
* 获取多参数的访问
* */
//@RequestMapping(value="/getbook",method=RequestMethod.GET)未引入feign httpclient
@RequestMapping(value="/getbook",method=RequestMethod.GET,consumes=MediaType.APPLICATION_JSON_VALUE)
public Book getBook(Book book);

/**
* id=1&bookName=abc&author=xiaoming
* */
@RequestMapping(value="/getbook2",method=RequestMethod.GET)
public Book getBook(@RequestParam("id") Integer id, @RequestParam("bookName") String bookName, @RequestParam("author") String author);

@RequestMapping(value="/addbook",method=RequestMethod.POST)
public Book addBook(@RequestBody Book book);

/**
* 用来接收批量处理的请求.
* */
@RequestMapping(value="/batchgetbook", method=RequestMethod.GET)
public List<Book> batchGetBook(@RequestParam("ids") String ids);
}

Provider Controller具体实现上述BookController类,主要代码如下:

[code]package com.bookshop.provider.controller;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import bookshop.api.controller.BookController;
import bookshop.api.entity.Book;

@RestController
public class BookControllerImp implements BookController {

@Override
public List<Book> getBookList() {
System.out.println("======================"+new Date());
try {
Thread.sleep(6000);
} catch (InterruptedException e) {

e.printStackTrace();
}

List<Book> list =  new ArrayList<Book>();
Book book = new Book();
book.setAuthor("小明");
book.setBookName("小明自传");
list.add(book);
return list;
}

@Override
public Book getBook(Integer id) {
Book book = new Book();
book.setId(id);
book.setAuthor("小明");
book.setBookName("小明自传");
return book;
}

@Override
//public Book getBook(Book book) {
public Book getBook(@RequestBody Book book) {
return book;
}

@Override
public Book getBook(Integer id, String bookName, String author) {
Book book = new Book();
book.setId(id);
book.setAuthor(author);
book.setBookName(bookName);
return book;
}

@Override
public Book addBook(@RequestBody Book book) {
return book;
}

@Override
public List<Book> batchGetBook(String ids) {

List<Book> bookList = new ArrayList<Book>();
String idArray[] = ids.split(",");
for (int i=0; i<idArray.length; i++) {
Book book = new Book();
book.setId(Integer.valueOf(idArray[i]));
book.setAuthor("Author" + idArray[i]);
book.setBookName("BookName"+idArray[i]);
bookList.add(book);
}
return bookList;
}

}

3 继承方式实现HystrixCommand

3.1 Consumer侧服务层支持批量请求

需要提供支持批量访问Provider Service接口的服 20000 务层,用来向Provider Service发送批量参数。如http://localhost:9092/batchgetbook?ids=2,3发送批量参数ids给provider接口。具体实现如下:

[code]package com.bookshop.consumer.batch.service;

import java.util.List;

import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import bookshop.api.entity.Book;

@Service
public class BookBatchCommandService {

@Autowired
private RestTemplate restTemplate;

@Autowired
private LoadBalancerClient loadBalancerClient;

public List<Book> getBookList(List<Integer> ids) {
System.out.println("====================get BookList===== " + ids);

ServiceInstance si=loadBalancerClient.choose("bookshop-service-provider");
StringBuffer sb=new StringBuffer("");
sb.append("http://");
sb.append(si.getHost());
sb.append(":");
sb.append(si.getPort());
sb.append("/batchgetbook");
sb.append("?ids="+StringUtils.join(ids,","));
System.out.println(sb.toString());

/**
*封装返回对象为List<Book>
**/
ParameterizedTypeReference<List<Book>> responseType = new ParameterizedTypeReference<List<Book>>(){};
ResponseEntity<List<Book>> result = restTemplate.exchange(sb.toString(),HttpMethod.GET, null,responseType);
System.out.println("xxxxx :" +result.getStatusCode());
return result.getBody();

}

}

3.2 Consumer侧继承HystrixCommand

利用继承HystrixCommand方式实现熔断,其run方法调用BookBatchCommandService中的批量访问Provider方法,具体代码如下清单:

[code]package com.bookshop.consumer.batch.command;

import java.util.List;

import com.bookshop.consumer.batch.service.BookBatchCommandService;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;

import bookshop.api.entity.Book;

public class BookBatchCommand extends HystrixCommand<List<Book>> {    //List<Book>为返回的数据类型

private List<Integer> ids;
private BookBatchCommandService bookcmdservice;

protected BookBatchCommand(BookBatchCommandService bookcmdservice, List<Integer> ids) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("BookBatchCommand"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationThreadTimeoutInMilliseconds(4000)));

this.ids = ids;
this.bookcmdservice = bookcmdservice;

}

@Override
protected List<Book> run() throws Exception {
return bookcmdservice.getBookList(ids); //调用BookBatchCommandService中的getBookList()方法
}

}

3.3 Consumer侧继承HystrixCollapser

用继承HystrixCollapser方式实现将一定时间窗口内的多个单独请求装载成一个批量请求并将批量请求获取的返回数据拆分到每个单独的请求中。其具体实现如下代码清单以及注释说明:

[code]package com.bookshop.consumer.batch.command;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

import bookshop.api.entity.Book;

import com.bookshop.consumer.batch.service.BookBatchCommandService;
import com.netflix.hystrix.HystrixCollapser;
import com.netflix.hystrix.HystrixCollapserKey;
import com.netflix.hystrix.HystrixCollapserProperties;
import com.netflix.hystrix.HystrixCommand;

public class BookCommandCollapser extends HystrixCollapser<List<Book>,Book,Integer> {  //三个参数,分别是批量的数据返回类型,单个请求的返回类型,单个请求的参数类型

private Integer bookId;
private BookBatchCommandService bookcmdservice;

public BookCommandCollapser(BookBatchCommandService bookcmdservice, Integer bookId) {

super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("BookCommandCollapser"))
.andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(5000))
//为请求合并器设置了时间延迟属性,合并器会在该时间窗内收集获取单个Book的请求并在时间窗结束时进行合并组装成一个批量请求
.andScope(Scope.GLOBAL));
//global context  作用于容器内所有调用线程,对一个依赖服务的任何一个command调用都被合并在一起,hystrix就传递一个HystrixRequestContext
//user request context 作用于容器内某一个调用线程,将某一个线程对某个依赖服务的多个command调用合并在一起
//默认是REQUEST,就是不会跨越多个请求会话的,只在当前用户请求中合并多次请求为批处理请求

this.bookId = bookId;
this.bookcmdservice = bookcmdservice;

}

/**
* 返回单个请求的参数booId
* */
@Override
public Integer getRequestArgument() {

return bookId;
}

/**
* CollapsedRequest参数中保存了延迟时间窗中收集到的所有获取单个Book的请求。
* 通过获取这些请求的参数来组织批量请求命令
**/
@Override
protected HystrixCommand<List<Book>> createCommand(
Collection<com.netflix.hystrix.HystrixCollapser.CollapsedRequest<Book, Integer>> requests) {
List<Integer> Ids = new ArrayList<>(requests.size());
Ids.addAll(requests.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList()));
System.out.println(Ids.toString()  +"   " + Ids.size());
return new BookBatchCommand(bookcmdservice,Ids);
}

/**
*通过遍历批量结果batchResponse对象,为CollapsedRequest中每个合并前的单个请求设置返回结果
*以此完成批量结果到单个请求结果的转换。
**/
@Override
protected void mapResponseToRequests(
List<Book> batchResponse,
Collection<com.netflix.hystrix.HystrixCollapser.CollapsedRequest<Book, Integer>> requests) {
int count = 0;
for(com.netflix.hystrix.HystrixCollapser.CollapsedRequest<Book, Integer> collapsedRequest : requests) {
Book book = batchResponse.get(count++);
collapsedRequest.setResponse(book);

}
}
}

3.4 Consumer侧Controller提供调用接口

Controller侧调用上述实现的BookCommandCollapser类,该类必须new一个实例,不能是单例。

[code]package com.bookshop.consumer.batch.controller;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import bookshop.api.entity.Book;

import com.bookshop.consumer.batch.command.BookCommandCollapser;
import com.bookshop.consumer.batch.service.BookBatchCommandService;
import com.bookshop.consumer.batch.service.BookService;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

@RestController
public class BookController {

@Autowired
private BookService bookservice;

@Autowired
private BookBatchCommandService bookcmdservice;

@RequestMapping(value="getBook",method=RequestMethod.GET)
public Book batchGetBook(@RequestParam Integer id) {

try {
return bookservice.getBook(id).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
return null;
}
}

@RequestMapping(value="getBook2",method=RequestMethod.GET)
public Book getBook(@RequestParam Integer id) {
HystrixRequestContext context = HystrixRequestContext.initializeContext();   //需要初始化Context,否则可能报错...
Future<Book> f1= new BookCommandCollapser(bookcmdservice,id).queue();
//Future<Book> f2= new BookCommandCollapser(bookcmdservice,2).queue();
//Future<Book> f3= new BookCommandCollapser(bookcmdservice,3).queue();
//上述注解的f2,f3是在HystrixCollapser的scope=REQUEST情况下做的测试,即:不会跨越多个请求会话的,只在当前用户请求中合并多次请求为批处理请求
try {
//f2.get();
//f3.get();
return f1.get();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
return null;
} finally{
context.close();
}
}

}

3.5 测试

运行Provider,Eureku,Consumer,在窗口时间内用浏览器发送两个请求(http://localhost:9099/getBook2?id=1http://localhost:9099/getBook2?id=2),后台窗口会打印出如下消息,说明请求进行了合并。

 

 

 

 

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: