spring cloud bus介绍与源码分析
spring cloud bus介绍与源码分析
简介
根据官方文档,spring cloud bus为分布式的轻量级消息代理服务,可以用来状态改变的时候进行广播,比如配置值发生变换,目前支持AMQP协议的代理服务。也就是说目前spring cloud bus只支持rabbitmq 以及kafkamq,kafka普遍用于大数据传输方面。
运用场景
目前所推荐的spring cloud bus的运用场景大多都是与spring cloud config 使用实现子服务动态刷新获取server端的配置文件变动。实现简单只需要简单配置,不需要代码实现。
和spring cloud config 组合的动态刷新获取最新配置的流程图,读者若想获取关于spring cloud config 的运用和原理可以参考笔者的这篇文章,spring cloud config的运用与原理
源码分析
关于spring cloud bus的源码挺简洁的,只用到了mq的部分特性,其核心类是BusAutoConfiguration类,此类完成了bus的自动配置。
@Configuration @ConditionalOnBusEnabled @EnableBinding(SpringCloudBusClient.class) @EnableConfigurationProperties(BusProperties.class) @AutoConfigureBefore(BindingServiceConfiguration.class) // so stream bindings work properly @AutoConfigureAfter(LifecycleMvcEndpointAutoConfiguration.class) // so actuator endpoints have needed dependencies public class BusAutoConfiguration implements ApplicationEventPublisherAware {
可以看出,引入了SpringCloudBusClient,BusProperties,BindingServiceConfiguration,LifecycleMvcEndpointAutoConfiguration这几个类。接下来看这几个类的作用
public interface SpringCloudBusClient { /** * Name of the input channel for Spring Cloud Bus. */ String INPUT = "springCloudBusInput"; /** * Name of the output channel for Spring Cloud Bus. */ String OUTPUT = "springCloudBusOutput"; @Output(SpringCloudBusClient.OUTPUT) MessageChannel springCloudBusOutput(); @Input(SpringCloudBusClient.INPUT) SubscribableChannel springCloudBusInput();
SpringCloudBusClient这个类显而易见的得知是创建了两个channel在mq上。
关于BusProperties,从类名就得知是bus的属性类。
BindingServiceConfiguration在spring boot 加载bus自动配置前的做的操作
private static Map<String, BinderConfiguration> getBinderConfigurations(BinderTypeRegistry binderTypeRegistry, BindingServiceProperties bindingServiceProperties) { Map<String, BinderConfiguration> binderConfigurations = new HashMap(); Map<String, BinderProperties> declaredBinders = bindingServiceProperties.getBinders(); boolean defaultCandidatesExist = false; for(Iterator binderPropertiesIterator = declaredBinders.entrySet().iterator(); !defaultCandidatesExist && binderPropertiesIterator.hasNext(); defaultCandidatesExist = ((BinderProperties)((Entry)binderPropertiesIterator.next()).getValue()).isDefaultCandidate()) { } List<String> existingBinderConfigurations = new ArrayList(); Iterator var7 = declaredBinders.entrySet().iterator(); Entry binderEntry; while(var7.hasNext()) { binderEntry = (Entry)var7.next(); BinderProperties binderProperties = (BinderProperties)binderEntry.getValue(); if (binderTypeRegistry.get((String)binderEntry.getKey()) != null) { binderConfigurations.put(binderEntry.getKey(), new BinderConfiguration((String)binderEntry.getKey(), binderProperties.getEnvironment(), binderProperties.isInheritEnvironment(), binderProperties.isDefaultCandidate())); existingBinderConfigurations.add(binderEntry.getKey()); } else { Assert.hasText(binderProperties.getType(), "No 'type' property present for custom binder " + (String)binderEntry.getKey()); binderConfigurations.put(binderEntry.getKey(), new BinderConfiguration(binderProperties.getType(), binderProperties.getEnvironment(), binderProperties.isInheritEnvironment(), binderProperties.isDefaultCandidate())); existingBinderConfigurations.add(binderEntry.getKey()); } } var7 = binderConfigurations.entrySet().iterator(); while(var7.hasNext()) { binderEntry = (Entry)var7.next(); if (((BinderConfiguration)binderEntry.getValue()).isDefaultCandidate()) { defaultCandidatesExist = true; } } if (!defaultCandidatesExist) { var7 = binderTypeRegistry.getAll().entrySet().iterator(); while(var7.hasNext()) { binderEntry = (Entry)var7.next(); if (!existingBinderConfigurations.contains(binderEntry.getKey())) { binderConfigurations.put(binderEntry.getKey(), new BinderConfiguration((String)binderEntry.getKey(), new HashMap(), true, true)); } } } return binderConfigurations; }
这个方法获取所有binder的实体信息存储在一个map。LifecycleMvcEndpointAutoConfiguration配置完成的后web方面配置。所以spring cloud bus
启动时执行的流程是,创建channel->加载对应的配置->扫描所有的serverice binder的配置信息储存到map->对web的配置。
关于post请求的触发,spring cloud bus 启动了许多的listener对各个post的请求的监听。
比如发送一个bus/refresh 的post请求
@Endpoint(id = "bus-refresh") // TODO: document new id public class RefreshBusEndpoint extends AbstractBusEndpoint { public RefreshBusEndpoint(ApplicationEventPublisher context, String id) { super(context, id); } @WriteOperation public void busRefreshWithDestination(@Selector String destination) { // TODO: // document // destination publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination)); } @WriteOperation public void busRefresh() { publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), null)); } }
会触发一个RefreshRemoteApplicationEvent事件,根据实例的id,以及输入的目标范围去刷新对应的service。
总结
spring cloud bus目前主要用于和spring cloud config的组合使用,并未完全用到mq的所有特性,在spring cloud stream中对于mq的使用以及扩展更加完善。
- Spring Cloud源码分析(一)Eureka
- spring cloud服务发现和注册源码分析
- Spring AOP介绍及源码分析
- 【springcloud】1.微服务之springcloud-》eureka源码分析之请叫我灵魂画师。。。
- spring cloud集成 consul源码分析
- spring-cloud源码解析-hystrix的基本介绍和配置属性说明
- 微服务之SpringCloud实战(四):SpringCloud Eureka源码分析
- SpringCloud-源码分析 Hystrix 熔断器
- springcloud 入门 5 (feign源码分析)
- spring cloud zuul网关服务重试请求配置和源码分析
- Spring AOP介绍及源码分析
- 干货|Spring Cloud Bus 消息总线介绍
- spring cloud 集成 prometheus 源码分析
- 【spring cloud】源码分析(一)
- Spring AOP介绍及源码分析
- Spring Cloud源码分析(二)Ribbon
- Spring Cloud源码分析(二)Ribbon(续)
- Spring Cloud Netflix Eureka client源码分析