您的位置:首页 > 编程语言 > Java开发

Spring Cloud 升级之路 - 2020.0.x - 7. 使用 Spring Cloud LoadBalancer (2)

2021-05-29 13:37 190 查看 https://www.cnblogs.com/zhxdic

本项目代码地址:https://github.com/HashZhang/spring-cloud-scaffold/tree/master/spring-cloud-iiford

我们使用 Spring Cloud 官方推荐的 Spring Cloud LoadBalancer 作为我们的客户端负载均衡器。上一节我们了解了 Spring Cloud LoadBalancer 的结构,接下来我们来说一下我们在使用 Spring Cloud LoadBalancer 要实现的功能:

  • 我们要实现不同集群之间不互相调用,通过实例的
    metamap
    中的
    zone
    配置
    ,来区分不同集群的实例。只有实例的
    metamap
    中的
    zone
    配置一样的实例才能互相调用。这个通过实现自定义的
    ServiceInstanceListSupplier
    即可实现
  • 负载均衡的轮询算法,需要请求与请求之间隔离,不能共用同一个 position 导致某个请求失败之后的重试还是原来失败的实例。上一节看到的默认的
    RoundRobinLoadBalancer
    是所有线程共用同一个原子变量
    position
    每次请求原子加 1。在这种情况下会有问题:假设有微服务 A 有两个实例:实例 1 和实例 2。请求 A 到达时,
    RoundRobinLoadBalancer
    返回实例 1,这时有请求 B 到达,
    RoundRobinLoadBalancer
    返回实例 2。然后如果请求 A 失败重试,
    RoundRobinLoadBalancer
    又返回了实例 1。这不是我们期望看到的。
  • 针对这两个功能,我们分别编写自己的实现。

    实现不同集群不互相调用

    Spring Cloud LoadBalancer 中的 zone 配置

    Spring Cloud LoadBalancer 定义了

    LoadBalancerZoneConfig

    public class LoadBalancerZoneConfig {
    //标识当前负载均衡器处于哪一个 zone
    private String zone;
    public LoadBalancerZoneConfig(String zone) {
    this.zone = zone;
    }
    public String getZone() {
    return zone;
    }
    public void setZone(String zone) {
    this.zone = zone;
    }
    }

    如果没有引入 Eureka 相关依赖,则这个 zone 通过

    spring.cloud.loadbalancer.zone
    配置:
    LoadBalancerAutoConfiguration

    @Bean
    @ConditionalOnMissingBean
    public LoadBalancerZoneConfig zoneConfig(Environment environment) {
    return new LoadBalancerZoneConfig(environment.getProperty("spring.cloud.loadbalancer.zone"));
    }

    如果引入了 Eureka 相关依赖,则如果在 Eureka 元数据配置了 zone,则这个 zone 会覆盖 Spring Cloud LoadBalancer 中的

    LoadBalancerZoneConfig

    EurekaLoadBalancerClientConfiguration

    @PostConstruct
    public void postprocess() {
    if (!StringUtils.isEmpty(zoneConfig.getZone())) {
    return;
    }
    String zone = getZoneFromEureka();
    if (!StringUtils.isEmpty(zone)) {
    if (LOG.isDebugEnabled()) {
    LOG.debug("Setting the value of '" + LOADBALANCER_ZONE + "' to " + zone);
    }
    //设置 `LoadBalancerZoneConfig`
    zoneConfig.setZone(zone);
    }
    }
    
    private String getZoneFromEureka() {
    String zone;
    //是否配置了 spring.cloud.loadbalancer.eureka.approximateZoneFromHostname 为 true
    boolean approximateZoneFromHostname = eurekaLoadBalancerProperties.isApproximateZoneFromHostname();
    //如果配置了,则尝试从 Eureka 配置的 host 名称中提取
    //实际就是以 . 分割 host,然后第二个就是 zone
    //例如 www.zone1.com 就是 zone1
    if (approximateZoneFromHostname && eurekaConfig != null) {
    return ZoneUtils.extractApproximateZone(this.eurekaConfig.getHostName(false));
    }
    else {
    //否则,从 metadata map 中取 zone 这个 key
    zone = eurekaConfig == null ? null : eurekaConfig.getMetadataMap().get("zone");
    //如果这个 key 不存在,则从配置中以 region 从 zone 列表取第一个 zone 作为当前 zone
    if (StringUtils.isEmpty(zone) && clientConfig != null) {
    String[] zones = clientConfig.getAvailabilityZones(clientConf
    ad8
    ig.getRegion());
    // Pick the first one from the regions we want to connect to
    zone = zones != null && zones.length > 0 ? zones[0] : null;
    }
    return zone;
    }
    }

    实现 SameZoneOnlyServiceInstanceListSupplier

    为了实现通过 zone 来过滤同一 zone 下的实例,并且绝对不会返回非同一 zone 下的实例,我们来编写代码:

    SameZoneOnlyServiceInstanceListSupplier

    /**
    * 只返回与当前实例同一个 Zone 的服务实例,不同 zone 之间的服务不互相调用
    */
    public class SameZoneOnlyServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {
    /**
    * 实例元数据 map 中表示 zone 配置的 key
    */
    private final String ZONE = "zone";
    /**
    * 当前 spring cloud loadbalancer 的 zone 配置
    */
    private final LoadBalancerZoneConfig zoneConfig;
    private String zone;
    
    public SameZoneOnlyServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, LoadBalancerZoneConfig zoneConfig) {
    super(delegate);
    this.zoneConfig = zoneConfig;
    }
    
    @Override
    public Flux<List<ServiceInstance>> get() {
    return getDelegate().get().map(this::filteredByZone);
    }
    
    //通过 zoneConfig 过滤
    private List<ServiceInstance> filteredByZone(List<ServiceInstance> serviceInstances) {
    if (zone == null) {
    zone = zoneConfig.getZone();
    }
    if (zone != null) {
    List<ServiceInstance> filteredInstances = new ArrayList<>();
    for (ServiceInstance serviceInstance : serviceInstances) {
    String instanceZone = getZone(serviceInstance);
    if (zone.equalsIgnoreCase(instanceZone)) {
    filteredInstances.add(serviceInstance);
    }
    }
    if (filteredInstances.size() > 0) {
    return filteredInstances;
    }
    }
    /**
    * @see ZonePreferenceServiceInstanceListSupplier 在没有相同zone实例的时候返回的是所有实例
    * 我们这里为了实现不同 zone 之间不互相调用需要返回空列表
    */
    return List.of();
    }
    
    //读取实例的 zone,没有配置则为 null
    private String getZone(ServiceInstance serviceInstance) {
    Map<String, String> metadata = serviceInstance.g
    ad0
    etMetadata();
    if (metadata != null) {
    return metadata.get(ZONE);
    }
    return null;
    }
    }

    实现请求与请求之间隔离的负载均衡算法

    在之前章节的讲述中,我们提到了我们使用 spring-cloud-sleuth 作为链路追踪库。我们想可以通过其中的 traceId,来区分究竟是否是同一个请求。

    RoundRobinWithRequestSeparatedPositionLoadBalancer

    //一定必须是实现ReactorServiceInstanceLoadBalancer
    //而不是ReactorLoadBalancer<ServiceInstance>
    //因为注册的时候是ReactorServiceInstanceLoadBalancer
    @Log4j2
    public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    private final ServiceInstanceListSupplier serviceInstanceListSupplier;
    //每次请求算上重试不会超过1分钟
    //对于超过1分钟的,这种请求肯定比较重,不应该重试
    private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
    //随机初始值,防止每次都是从第一个开始调用
    .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));
    private final String serviceId;
    private final Tracer tracer;
    
    public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) {
    this.serviceInstanceListSupplier = serviceInstanceListSupplier;
    this.serviceId = serviceId;
    this.tracer = tracer;
    }
    
    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
    return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances));
    }
    
    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
    if (serviceInstances.isEmpty()) {
    log.warn("No servers available for service: " + this.serviceId);
    return new EmptyResponse();
    }
    return getInstanceResponseByRoundRobin(serviceInstances);
    }
    
    private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) {
    if (serviceInstances.isEmpty()) {
    log.warn("No servers
    ad8
    available for service: " + this.serviceId);
    return new EmptyResponse();
    }
    //为了解决原始算法不同调用并发可能导致一个请求重试相同的实例
    Span currentSpan = tracer.currentSpan();
    if (currentSpan == null) {
    currentSpan = tracer.newTrace();
    }
    long l = currentSpan.context().traceId();
    AtomicInteger seed = positionCache.get(l);
    int s = seed.getAndIncrement();
    int pos = s % serviceInstances.size();
    log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size());
    return new DefaultResponse(serviceInstances.stream()
    //实例返回列表顺序可能不同,为了保持一致,先排序再取
    .sorted(Comparator.comparing(ServiceInstance::getInstanceId))
    .collect(Collectors.toList()).get(pos));
    }
    }

    将上述两个元素加入我们自定义的 LoadBalancerClient 并启用

    在上一节,我们提到了可以通过

    @LoadBalancerClients
    注解配置默认的负载均衡器配置,我们这里就是通过这种方式进行配置。首先在 spring.factories 中添加自动配置类:

    spring.factories

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    com.github.hashjang.spring.cloud.iiford.service.common.auto.LoadBalancerAutoConfiguration

    然后编写这个自动配置类,其实很简单,就是添加一个

    @LoadBalancerClients
    注解,设置默认配置类:

    LoadBalancerAutoConfiguration

    @Configuration(proxyBeanMethods = false)
    @LoadBalancerClients(defaultConfiguration = DefaultLoadBalancerConfiguration.class)
    public class LoadBalancerAutoConfiguration {
    }

    编写这个默认配置类,将上面我们实现的两个类,组装进去:

    DefaultLoadBalancerConfiguration<
    ad8
    /code>

    [code]@Configuration(proxyBeanMethods = false) public class DefaultLoadBalancerConfiguration { @Bean public ServiceInstanceListSupplier serviceInstanceListSupplier( DiscoveryClient discoveryClient, Environment env, ConfigurableApplicationContext context, LoadBalancerZoneConfig zoneConfig ) { ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context .getBeanProvider(LoadBalancerCacheManager.class); return //开启服务实例缓存 new CachingServiceInstanceListSupplier( //只能返回同一个 zone 的服务实例 new SameZoneOnlyServiceInstanceListSupplier( //启用通过 discoveryClient 的服务发现 new DiscoveryClientServiceInstanceListSupplier( discoveryClient, env ), zoneConfig ) , cacheManagerProvider.getIfAvailable() ); } @Bean public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer( Environment environment, ServiceInstanceListSupplier serviceInstanceListSupplier, Tracer tracer ) { String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); return new RoundRobinWithRequestSeparatedPositionLoadBalancer( serviceInstanceListSupplier, name, tracer ); } }

    这样,我们就实现了自定义的负载均衡器。也理解了 Spring Cloud LoadBalancer 的使用。接下来,我们来单元测试下这些功能。集成测试后面会有单独的章节,不用着急。

    单元测试上述功能

    通过这届单元测试,我们也可以了解下一般我们实现 spring cloud 自定义的基础组件,怎么去单元测试。

    这里的单元测试主要测试三个场景:

    1. 只返回同一个 zone 下的实例,其他 zone 的不会返回
    2. 对于多个请求,每个请求返回的与上次的实例不同。
    3. 对于多线程的每个请求,如果重试,返回的都是不同的实例

    编写代码:

    LoadBalancerTest

    //SpringRunner也
    231f
    包含了MockitoJUnitRunner,所以 @Mock 等注解也生效了
    @RunWith(SpringRunner.class)
    @SpringBootTest(properties = {LoadBalancerEurekaAutoConfiguration.LOADBALANCER_ZONE + "=zone1"})
    public class LoadBalancerTest {
    
    @EnableAutoConfiguration(exclude = EurekaDiscoveryClientConfiguration.class)
    @Configuration
    public static class App {
    @Bean
    public DiscoveryClient discoveryClient() {
    ServiceInstance zone1Instance1 = Mockito.mock(ServiceInstance.class);
    ServiceInstance zone1Instance2 = Mockito.mock(ServiceInstance.class);
    ServiceInstance zone2Instance3 = Mockito.mock(ServiceInstance.class);
    Map<String, String> zone1 = Map.ofEntries(
    Map.entry("zone", "zone1")
    );
    Map<String, String> zone2 = Map.ofEntries(
    Map.entry("zone", "zone2")
    );
    when(zone1Instance1.getMetadata()).thenReturn(zone1);
    when(zone1Instance1.getInstanceId()).thenReturn("instance1");
    when(zone1Instance2.getMetadata()).thenReturn(zone1);
    when(zone1Instance2.getInstanceId()).thenReturn("instance2");
    when(zone2Instance3.getMetadata()).thenReturn(zone2);
    when(zone2Instance3.getInstanceId()).thenReturn("instance3");
    DiscoveryClient mock = Mockito.mock(DiscoveryClient.class);
    Mockito.when(mock.getInstances("testService"))
    .thenReturn(List.of(zone1Instance1, zone1Instance2, zone2Instance3));
    return mock;
    }
    }
    
    @Autowired
    private LoadBalancerClientFactory loadBalancerClientFactory;
    @Autowired
    private Tracer tracer;
    
    /**
    * 只返回同一个 zone 下的实例
    */
    @Test
    public void testFilteredByZone() {
    ReactiveLoadBalancer<ServiceInstance> testService =
    loadBalancerClientFactory.getInstance("testService");
    for (int i = 0; i < 100; i++) {
    ServiceInstance server = Mono.from(testService.choose()).block().getServer();
    //必须处于和当前实例同一个zone下
    Assert.assertEquals(server.getMetadata().get("zone"), "zone1");
    }
    }
    
    /**
    * 返回不同的实例
    */
    @Test
    public void testReturnNext() {
    ReactiveLoadBalancer<ServiceInstance> testService =
    loadBalancerClientFactory.getInstance("testService");
    //获取服务实例
    ServiceInstance server1 = Mono.from(testService.choose()).block().getServer();
    ServiceInstance server2 = Mono.from(testService.choose()).block().getServer();
    //每次选择的是不同实例
    Assert.assertNotEquals(server1.getInstanceId(), server2.getInstanceId());
    }
    
    /**
    * 跨线程,默认情况下是可能返回同一实例的,在我们的实现下,保持
    * span 则会返回下一个实例,这样保证多线程环境同一个 request 重试会返回下一实例
    * @throws Exception
    */
    @Test
    public void testSameSpanReturnNext() throws Exception {
    Span span = tracer.nextSpan();
    //测试 100 次
    for (int i = 0; i < 100; i++) {
    try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
    ReactiveLoadBalancer<ServiceInstance> testService =
    loadBalancerClientFactory.getInstance("testService");
    //获取实例
    ServiceInstance server1 = Mono.from(testService.choose()).block().getServer();
    AtomicReference<ServiceInstance> server2 = new AtomicReference<>();
    Thread thread = new Thread(() -> {
    //保持 trace,这样就会认为仍然是同一个请求上下文,这样模拟重试
    try (Tracer.SpanInScope cleared2 = tracer.withSpanInScope(span)) {
    server2.set(Mono.from(testService.choose()).block().getServer());
    }
    });
    thread.start();
    thread.join();
    System.out.println(i);
    Assert.assertNotEquals(server1.getInstanceId(), server2.get().getInstanceId());
    }
    }
    }
    }

    运行测试,测试通过。

    微信搜索“我的编程喵”关注公众号,加作者微信,每日一刷,轻松提升技术,斩获各种offer

    内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: