微服务实现请求合并(Hystrix)
目录
3.3 Consumer侧继承HystrixCollapser
1 请求合并架构
Consumer服务为避免大并发请求Provider服务,造成Provider服务器压力过大,进而导致Prvoider服务雪崩,故在Consumer服务侧采取请求合并策略,将一定时间内的请求进行合并,一次访问Provider获取数据。如下图形象的描述了请求合并的机制:
2 注解方式实现HystrixCommand
Hystrix提供了两种(注解、继承)方式来实现请求合并,本章以注解的方式来优雅的实现请求合并功能。
2.1 Consumer引入POM
主要引入Hystrix的依赖
[code] <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency>
2.2 编写请求合并代码
在BookService类中为getBook方法编写一个请求合并的代码getBookList,根据HystrixProperty属性设置将5s内的请求进行合并处理,且每次最大请求数量为200。另外,需要Provider服务支持http://localhost:9092/batchgetbook?ids=1,2 类型的请求。
[code] @Service public class BookService { @Autowired private RestTemplate restTemplate; @Autowired private LoadBalancerClient loadBalancerClient; /** * 可异步 * */ @HystrixCollapser(batchMethod = "getBookList", scope=com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL, collapserProperties={ //请求时间间隔在5s之内的请求会被合并为一个请求,为方便手动测试,将两个浏览器的请求进行合并,将时间设置得比较大,实际中 //需要设置得少,避免拉长每个请求的响应 @HystrixProperty(name="timerDelayInMilliseconds", value="5000"), //在批处理中允许的最大请求数 @HystrixProperty(name="maxRequestsInBatch",value="200"), }) public Future<Book> getBook(Integer id){ System.out.println("============"+id); return null; } @HystrixCommand (commandProperties={ @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",value = "4000") }) public List<Book> getBookList(List<Integer> ids) { System.out.println("====================get BookList===== " + ids); ServiceInstance si=loadBalancerClient.choose("bookshop-service-provider"); StringBuffer sb=new StringBuffer(""); sb.append("http://"); sb.append(si.getHost()); sb.append(":"); sb.append(si.getPort()); sb.append("/batchgetbook"); sb.append("?ids="+StringUtils.join(ids,",")); System.out.println(sb.toString()); /** *封装返回对象为List<Book> **/ ParameterizedTypeReference<List<Book>> responseType = new ParameterizedTypeReference<List<Book>>(){}; ResponseEntity<List<Book>> result = restTemplate.exchange(sb.toString(),HttpMethod.GET, null,responseType); System.out.println("xxxxx :" +result.getStatusCode()); return result.getBody(); } }
参数解释:
请求合并参数 | |||
参数 | 作用 | 默认值 | 备注 |
@HystrixCollapser | 被@HystrixCollapser标注的方法,返回类型必须为Future,使用异步方法,否则无法进行请求合并. | ||
batchMethod | 请求合并的方法 | 方法只能接受一个参数,如果需要传递多个参数,封装成一个类参数 | |
scope | 请求范围 | REQUEST | 请求方式:分为REQUEST,GLOBAL REQUEST范围只对一个request请求内的多次服务请求进行合并。 GLOBAL是应用中的所有线程的请求中的多次服务请求进行合并. |
timeDelayinMilliseconds | 请求时间间隔在10ms之内的请求会被合并为一个请求 | 10ms | 建议尽量设置得小一点,如果并发量不大,没必要使用HystrixCollapser来处理. |
maxRequestsInBatch | 设置触发批处理执行之前,在批处理中允许的最大请求数 | Integer.MAX_VALUE |
2.3 Consumer启动类&配置
在启动类中启动Hystrix熔断和EurekaClient,启动Eureka主要是访问Provider提供的服务。
[code]@SpringBootApplication @EnableCircuitBreaker @EnableEurekaClient public class BookshopConsumerHystrixBatchApplication { @Bean public RestTemplate createRt() { return new RestTemplate(); } public static void main(String[] args) { SpringApplication.run(BookshopConsumerHystrixBatchApplication.class, args); } }
2.4 Provider服务详解
Provider服务利用Feign和Eureka搭建出业务平台。此处简单的介绍其实现:由Controller API,Provider Service组成,Controller API设定每个请求的参数和请求方式以及Entity,Provider Service实现Controller API。(具体的实现架构可参见声明式服务调用的实现(Feign)及其优化(Ribbon) )
Controller API主要代码(未包含Book Entity):本次Consumer主要调用的是List<Book> batchGetBook(@RequestParam("ids") String ids)方法。
[code]package bookshop.api.controller; import java.util.List; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import bookshop.api.entity.Book; public interface BookController { @GetMapping(value="/getbooklist") public List<Book> getBookList(); @RequestMapping(value="/get",method=RequestMethod.GET) public Book getBook(@RequestParam("id") Integer id); /** * 获取多参数的访问 * */ //@RequestMapping(value="/getbook",method=RequestMethod.GET)未引入feign httpclient @RequestMapping(value="/getbook",method=RequestMethod.GET,consumes=MediaType.APPLICATION_JSON_VALUE) public Book getBook(Book book); /** * id=1&bookName=abc&author=xiaoming * */ @RequestMapping(value="/getbook2",method=RequestMethod.GET) public Book getBook(@RequestParam("id") Integer id, @RequestParam("bookName") String bookName, @RequestParam("author") String author); @RequestMapping(value="/addbook",method=RequestMethod.POST) public Book addBook(@RequestBody Book book); /** * 用来接收批量处理的请求. * */ @RequestMapping(value="/batchgetbook", method=RequestMethod.GET) public List<Book> batchGetBook(@RequestParam("ids") String ids); }
Provider Controller具体实现上述BookController类,主要代码如下:
[code]package com.bookshop.provider.controller; import java.util.ArrayList; import java.util.Date; import java.util.List; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import bookshop.api.controller.BookController; import bookshop.api.entity.Book; @RestController public class BookControllerImp implements BookController { @Override public List<Book> getBookList() { System.out.println("======================"+new Date()); try { Thread.sleep(6000); } catch (InterruptedException e) { e.printStackTrace(); } List<Book> list = new ArrayList<Book>(); Book book = new Book(); book.setAuthor("小明"); book.setBookName("小明自传"); list.add(book); return list; } @Override public Book getBook(Integer id) { Book book = new Book(); book.setId(id); book.setAuthor("小明"); book.setBookName("小明自传"); return book; } @Override //public Book getBook(Book book) { public Book getBook(@RequestBody Book book) { return book; } @Override public Book getBook(Integer id, String bookName, String author) { Book book = new Book(); book.setId(id); book.setAuthor(author); book.setBookName(bookName); return book; } @Override public Book addBook(@RequestBody Book book) { return book; } @Override public List<Book> batchGetBook(String ids) { List<Book> bookList = new ArrayList<Book>(); String idArray[] = ids.split(","); for (int i=0; i<idArray.length; i++) { Book book = new Book(); book.setId(Integer.valueOf(idArray[i])); book.setAuthor("Author" + idArray[i]); book.setBookName("BookName"+idArray[i]); bookList.add(book); } return bookList; } }
3 继承方式实现HystrixCommand
3.1 Consumer侧服务层支持批量请求
需要提供支持批量访问Provider Service接口的服 20000 务层,用来向Provider Service发送批量参数。如http://localhost:9092/batchgetbook?ids=2,3发送批量参数ids给provider接口。具体实现如下:
[code]package com.bookshop.consumer.batch.service; import java.util.List; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.loadbalancer.LoadBalancerClient; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import bookshop.api.entity.Book; @Service public class BookBatchCommandService { @Autowired private RestTemplate restTemplate; @Autowired private LoadBalancerClient loadBalancerClient; public List<Book> getBookList(List<Integer> ids) { System.out.println("====================get BookList===== " + ids); ServiceInstance si=loadBalancerClient.choose("bookshop-service-provider"); StringBuffer sb=new StringBuffer(""); sb.append("http://"); sb.append(si.getHost()); sb.append(":"); sb.append(si.getPort()); sb.append("/batchgetbook"); sb.append("?ids="+StringUtils.join(ids,",")); System.out.println(sb.toString()); /** *封装返回对象为List<Book> **/ ParameterizedTypeReference<List<Book>> responseType = new ParameterizedTypeReference<List<Book>>(){}; ResponseEntity<List<Book>> result = restTemplate.exchange(sb.toString(),HttpMethod.GET, null,responseType); System.out.println("xxxxx :" +result.getStatusCode()); return result.getBody(); } }
3.2 Consumer侧继承HystrixCommand
利用继承HystrixCommand方式实现熔断,其run方法调用BookBatchCommandService中的批量访问Provider方法,具体代码如下清单:
[code]package com.bookshop.consumer.batch.command; import java.util.List; import com.bookshop.consumer.batch.service.BookBatchCommandService; import com.netflix.hystrix.HystrixCommand; import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixCommandProperties; import bookshop.api.entity.Book; public class BookBatchCommand extends HystrixCommand<List<Book>> { //List<Book>为返回的数据类型 private List<Integer> ids; private BookBatchCommandService bookcmdservice; protected BookBatchCommand(BookBatchCommandService bookcmdservice, List<Integer> ids) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("BookBatchCommand")) .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationThreadTimeoutInMilliseconds(4000))); this.ids = ids; this.bookcmdservice = bookcmdservice; } @Override protected List<Book> run() throws Exception { return bookcmdservice.getBookList(ids); //调用BookBatchCommandService中的getBookList()方法 } }
3.3 Consumer侧继承HystrixCollapser
用继承HystrixCollapser方式实现将一定时间窗口内的多个单独请求装载成一个批量请求并将批量请求获取的返回数据拆分到每个单独的请求中。其具体实现如下代码清单以及注释说明:
[code]package com.bookshop.consumer.batch.command; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.stream.Collectors; import bookshop.api.entity.Book; import com.bookshop.consumer.batch.service.BookBatchCommandService; import com.netflix.hystrix.HystrixCollapser; import com.netflix.hystrix.HystrixCollapserKey; import com.netflix.hystrix.HystrixCollapserProperties; import com.netflix.hystrix.HystrixCommand; public class BookCommandCollapser extends HystrixCollapser<List<Book>,Book,Integer> { //三个参数,分别是批量的数据返回类型,单个请求的返回类型,单个请求的参数类型 private Integer bookId; private BookBatchCommandService bookcmdservice; public BookCommandCollapser(BookBatchCommandService bookcmdservice, Integer bookId) { super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("BookCommandCollapser")) .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(5000)) //为请求合并器设置了时间延迟属性,合并器会在该时间窗内收集获取单个Book的请求并在时间窗结束时进行合并组装成一个批量请求 .andScope(Scope.GLOBAL)); //global context 作用于容器内所有调用线程,对一个依赖服务的任何一个command调用都被合并在一起,hystrix就传递一个HystrixRequestContext //user request context 作用于容器内某一个调用线程,将某一个线程对某个依赖服务的多个command调用合并在一起 //默认是REQUEST,就是不会跨越多个请求会话的,只在当前用户请求中合并多次请求为批处理请求 this.bookId = bookId; this.bookcmdservice = bookcmdservice; } /** * 返回单个请求的参数booId * */ @Override public Integer getRequestArgument() { return bookId; } /** * CollapsedRequest参数中保存了延迟时间窗中收集到的所有获取单个Book的请求。 * 通过获取这些请求的参数来组织批量请求命令 **/ @Override protected HystrixCommand<List<Book>> createCommand( Collection<com.netflix.hystrix.HystrixCollapser.CollapsedRequest<Book, Integer>> requests) { List<Integer> Ids = new ArrayList<>(requests.size()); Ids.addAll(requests.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList())); System.out.println(Ids.toString() +" " + Ids.size()); return new BookBatchCommand(bookcmdservice,Ids); } /** *通过遍历批量结果batchResponse对象,为CollapsedRequest中每个合并前的单个请求设置返回结果 *以此完成批量结果到单个请求结果的转换。 **/ @Override protected void mapResponseToRequests( List<Book> batchResponse, Collection<com.netflix.hystrix.HystrixCollapser.CollapsedRequest<Book, Integer>> requests) { int count = 0; for(com.netflix.hystrix.HystrixCollapser.CollapsedRequest<Book, Integer> collapsedRequest : requests) { Book book = batchResponse.get(count++); collapsedRequest.setResponse(book); } } }
3.4 Consumer侧Controller提供调用接口
Controller侧调用上述实现的BookCommandCollapser类,该类必须new一个实例,不能是单例。
[code]package com.bookshop.consumer.batch.controller; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import bookshop.api.entity.Book; import com.bookshop.consumer.batch.command.BookCommandCollapser; import com.bookshop.consumer.batch.service.BookBatchCommandService; import com.bookshop.consumer.batch.service.BookService; import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext; @RestController public class BookController { @Autowired private BookService bookservice; @Autowired private BookBatchCommandService bookcmdservice; @RequestMapping(value="getBook",method=RequestMethod.GET) public Book batchGetBook(@RequestParam Integer id) { try { return bookservice.getBook(id).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); return null; } } @RequestMapping(value="getBook2",method=RequestMethod.GET) public Book getBook(@RequestParam Integer id) { HystrixRequestContext context = HystrixRequestContext.initializeContext(); //需要初始化Context,否则可能报错... Future<Book> f1= new BookCommandCollapser(bookcmdservice,id).queue(); //Future<Book> f2= new BookCommandCollapser(bookcmdservice,2).queue(); //Future<Book> f3= new BookCommandCollapser(bookcmdservice,3).queue(); //上述注解的f2,f3是在HystrixCollapser的scope=REQUEST情况下做的测试,即:不会跨越多个请求会话的,只在当前用户请求中合并多次请求为批处理请求 try { //f2.get(); //f3.get(); return f1.get(); } catch (InterruptedException | ExecutionException e) { // TODO Auto-generated catch block return null; } finally{ context.close(); } } }
3.5 测试
运行Provider,Eureku,Consumer,在窗口时间内用浏览器发送两个请求(http://localhost:9099/getBook2?id=1和http://localhost:9099/getBook2?id=2),后台窗口会打印出如下消息,说明请求进行了合并。
- OpenLayers跨域请求WFS服务在Tomcat环境下的实现cgi
- ASP.NET WebApi服务接口如何防止重复请求实现HTTP幂等性(八)
- 用一个WEB服务或普通站点 实现这样一个效果?以URL请求,返回一个XML文档
- webpack4+node合并资源请求, 实现combo功能(二十三)
- 使用Hystrix实现微服务的容错处理
- 微服务开发架构——Spring Cloud常见问题与总结<二>Hystrix/Feign 整合Hystrix后首次请求失败
- 二、Hystrix隔离服务的实现原理---线程池隔离
- 高性能服务通信框架Gaea的详细实现--server请求处理流程
- 实现在GET请求下调用WCF服务时传递对象(复合类型)参数
- Hystrix:HystrixCollapser请求合并
- Hystrix请求合并
- 在线程中实现请求网络服务,抛出了IOException异常。
- 使用Hystrix实现微服务的容错处理
- springCloud(12):使用Hystrix实现微服务的容错处理-Hystrix的监控
- 学习笔记:微服务12 spring cloud Feign(Rest请求)+ hystrix(熔断)
- Spring Cloud Hystrix的请求合并
- SpringCloud系列:服务注册与发现、负责均衡、hystrix服务降级的实现
- 使用Hystrix实现微服务的容错处理
- 使用Hystrix实现微服务的容错处理
- git http服务免登录实现(免去每次请求用户名密码输入,Visual Studio可用)