您的位置:首页 > 编程语言 > Java开发

spring cloud bus介绍与源码分析

2019-04-18 17:21 861 查看

spring cloud bus介绍与源码分析

简介

根据官方文档,spring cloud bus为分布式的轻量级消息代理服务,可以用来状态改变的时候进行广播,比如配置值发生变换,目前支持AMQP协议的代理服务。也就是说目前spring cloud bus只支持rabbitmq 以及kafkamq,kafka普遍用于大数据传输方面。

运用场景

目前所推荐的spring cloud bus的运用场景大多都是与spring cloud config 使用实现子服务动态刷新获取server端的配置文件变动。实现简单只需要简单配置,不需要代码实现。

和spring cloud config 组合的动态刷新获取最新配置的流程图,读者若想获取关于spring cloud config 的运用和原理可以参考笔者的这篇文章,spring cloud config的运用与原理

源码分析

关于spring cloud bus的源码挺简洁的,只用到了mq的部分特性,其核心类是BusAutoConfiguration类,此类完成了bus的自动配置。

@Configuration
@ConditionalOnBusEnabled
@EnableBinding(SpringCloudBusClient.class)
@EnableConfigurationProperties(BusProperties.class)
@AutoConfigureBefore(BindingServiceConfiguration.class)
// so stream bindings work properly
@AutoConfigureAfter(LifecycleMvcEndpointAutoConfiguration.class)
// so actuator endpoints have needed dependencies
public class BusAutoConfiguration implements ApplicationEventPublisherAware {

可以看出,引入了SpringCloudBusClient,BusProperties,BindingServiceConfiguration,LifecycleMvcEndpointAutoConfiguration这几个类。接下来看这几个类的作用

public interface SpringCloudBusClient {

/**
* Name of the input channel for Spring Cloud Bus.
*/
String INPUT = "springCloudBusInput";

/**
* Name of the output channel for Spring Cloud Bus.
*/
String OUTPUT = "springCloudBusOutput";

@Output(SpringCloudBusClient.OUTPUT)
MessageChannel springCloudBusOutput();

@Input(SpringCloudBusClient.INPUT)
SubscribableChannel springCloudBusInput();

SpringCloudBusClient这个类显而易见的得知是创建了两个channel在mq上。
关于BusProperties,从类名就得知是bus的属性类。

BindingServiceConfiguration在spring boot 加载bus自动配置前的做的操作

private static Map<String, BinderConfiguration> getBinderConfigurations(BinderTypeRegistry binderTypeRegistry, BindingServiceProperties bindingServiceProperties) {
Map<String, BinderConfiguration> binderConfigurations = new HashMap();
Map<String, BinderProperties> declaredBinders = bindingServiceProperties.getBinders();
boolean defaultCandidatesExist = false;

for(Iterator binderPropertiesIterator = declaredBinders.entrySet().iterator(); !defaultCandidatesExist && binderPropertiesIterator.hasNext(); defaultCandidatesExist = ((BinderProperties)((Entry)binderPropertiesIterator.next()).getValue()).isDefaultCandidate()) {
}

List<String> existingBinderConfigurations = new ArrayList();
Iterator var7 = declaredBinders.entrySet().iterator();

Entry binderEntry;
while(var7.hasNext()) {
binderEntry = (Entry)var7.next();
BinderProperties binderProperties = (BinderProperties)binderEntry.getValue();
if (binderTypeRegistry.get((String)binderEntry.getKey()) != null) {
binderConfigurations.put(binderEntry.getKey(), new BinderConfiguration((String)binderEntry.getKey(), binderProperties.getEnvironment(), binderProperties.isInheritEnvironment(), binderProperties.isDefaultCandidate()));
existingBinderConfigurations.add(binderEntry.getKey());
} else {
Assert.hasText(binderProperties.getType(), "No 'type' property present for custom binder " + (String)binderEntry.getKey());
binderConfigurations.put(binderEntry.getKey(), new BinderConfiguration(binderProperties.getType(), binderProperties.getEnvironment(), binderProperties.isInheritEnvironment(), binderProperties.isDefaultCandidate()));
existingBinderConfigurations.add(binderEntry.getKey());
}
}

var7 = binderConfigurations.entrySet().iterator();

while(var7.hasNext()) {
binderEntry = (Entry)var7.next();
if (((BinderConfiguration)binderEntry.getValue()).isDefaultCandidate()) {
defaultCandidatesExist = true;
}
}

if (!defaultCandidatesExist) {
var7 = binderTypeRegistry.getAll().entrySet().iterator();

while(var7.hasNext()) {
binderEntry = (Entry)var7.next();
if (!existingBinderConfigurations.contains(binderEntry.getKey())) {
binderConfigurations.put(binderEntry.getKey(), new BinderConfiguration((String)binderEntry.getKey(), new HashMap(), true, true));
}
}
}

return binderConfigurations;
}

这个方法获取所有binder的实体信息存储在一个map。LifecycleMvcEndpointAutoConfiguration配置完成的后web方面配置。所以spring cloud bus
启动时执行的流程是,创建channel->加载对应的配置->扫描所有的serverice binder的配置信息储存到map->对web的配置。

关于post请求的触发,spring cloud bus 启动了许多的listener对各个post的请求的监听。

比如发送一个bus/refresh 的post请求

@Endpoint(id = "bus-refresh") // TODO: document new id
public class RefreshBusEndpoint extends AbstractBusEndpoint {

public RefreshBusEndpoint(ApplicationEventPublisher context, String id) {
super(context, id);
}

@WriteOperation
public void busRefreshWithDestination(@Selector String destination) { // TODO:
// document
// destination
publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination));
}

@WriteOperation
public void busRefresh() {
publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), null));
}

}

会触发一个RefreshRemoteApplicationEvent事件,根据实例的id,以及输入的目标范围去刷新对应的service。

总结

spring cloud bus目前主要用于和spring cloud config的组合使用,并未完全用到mq的所有特性,在spring cloud stream中对于mq的使用以及扩展更加完善。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: