ActiveMQ源码分析(三):聊聊broker到broker的通讯
2016-06-20 02:02
615 查看
Broker到Broker的通讯涉及到activemq集群,主要由network包下的类来实现,我们主要来分析一下NetworkConnector和NetworkBridge类,读者就能大概了解broker的通讯机制了。NetworkBridge是一个借口,所以我们使用 NetworkBridge的子类DemandForwardingBridgeSupport来做讲解。
DemandForwardingBridgeSupport.java
主要分析它的start、serviceLocalCommand和serviceRemoteCommand方法
serviceLocalCommand方法
serviceRemoteCommand方法
小结:
NetworkBridge的主要作用就是根据本地配置讯息发送本地broker的相关讯息到远端broker,从远端broker接收消息做对应处理。
NetworkConnector.java
主要分析它的configureBridge方法
NetworkConnector主要用于管理NetworkBridge。
DemandForwardingBridgeSupport.java
主要分析它的start、serviceLocalCommand和serviceRemoteCommand方法
public void start() throws Exception { if (started.compareAndSet(false, true)) { if (brokerService == null) { throw new IllegalArgumentException("BrokerService is null on " + this); } //是否允许静态连接,即networkconnector配置中的staticBridge配置项 networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics()); //是否是双向通讯 if (isDuplex()) { duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker()); //设置监听器 duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() { @Override public void onCommand(Object o) { Command command = (Command) o; serviceLocalCommand(command); } @Override public void onException(IOException error) { serviceLocalException(error); } }); duplexInboundLocalBroker.start(); } localBroker.setTransportListener(new DefaultTransportListener() { @Override public void onCommand(Object o) { Command command = (Command) o; serviceLocalCommand(command); } @Override public void onException(IOException error) { if (!futureLocalBrokerInfo.isDone()) { futureLocalBrokerInfo.cancel(true); return; } serviceLocalException(error); } }); remoteBroker.setTransportListener(new DefaultTransportListener() { @Override public void onCommand(Object o) { Command command = (Command) o; serviceRemoteCommand(command); } @Override public void onException(IOException error) { if (!futureRemoteBrokerInfo.isDone()) { futureRemoteBrokerInfo.cancel(true); return; } serviceRemoteException(error); } }); //分别启动localBroker和remoteBroker remoteBroker.start(); localBroker.start(); if (!disposed.get()) { try { triggerStartAsyncNetworkBridgeCreation(); } catch (IOException e) { LOG.warn("Caught exception from remote start", e); } } else { LOG.warn("Bridge was disposed before the start() method was fully executed."); throw new TransportDisposedIOException(); } } }
serviceLocalCommand方法
protected void serviceLocalCommand(Command command) { //是否已释放 if (!disposed.get()) { try { //是否是MessageDispatch类型的command if (command.isMessageDispatch()) { //本地broker启动后再往下执行 safeWaitUntilStarted(); //入队计数+1 networkBridgeStatistics.getEnqueues().increment(); final MessageDispatch md = (MessageDispatch) command; //获取消息的订阅者 final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) { if (suppressMessageDispatch(md, sub)) { LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{ configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage() }); // still ack as it may be durable try { //oneway的方式发送需要确认消息 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); } finally { sub.decrementOutstandingResponses(); } return; } Message message = configureMessage(md); LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{ configuration.getBrokerName(), remoteBrokerName, (LOG.isTraceEnabled() ? message : message.getMessageId()), md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), message }); //是否是双向的broker和符合配置的过滤规则 if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) { try { // never request b/c they are eventually acked async remoteBroker.oneway(message); } finally { sub.decrementOutstandingResponses(); } return; } //消息是否是可持久化的或者同步发送的 if (message.isPersistent() || configuration.isAlwaysSyncSend()) { // 消息不是用异步方式发送的,所以我们只能当我们获得远端broker已经确认消息的讯息时才能在本地做确认操作,所以这里使用异步请求 remoteBroker.asyncRequest(message, new ResponseCallback() { @Override public void onCompletion(FutureResponse future) { try { Response response = future.getResult(); if (response.isException()) { ExceptionResponse er = (ExceptionResponse) response; serviceLocalException(md, er.getException()); } else { localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); networkBridgeStatistics.getDequeues().increment(); } } catch (IOException e) { serviceLocalException(md, e); } finally { sub.decrementOutstandingResponses(); } } }); } else { // 如果消息是异步发送的我们只能通过使用异步发送来传递消息以保证消息传递的可靠性(小概率丢失消息),所以这里使用同步请求 try { remoteBroker.oneway(message); localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); networkBridgeStatistics.getDequeues().increment(); } finally { sub.decrementOutstandingResponses(); } }
//销毁消息 serviceOutbound(message); } else { LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage()); } } else if (command.isBrokerInfo()) { futureLocalBrokerInfo.set((BrokerInfo) command); } else if (command.isShutdownInfo()) { LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName()); stop(); } else if (command.getClass() == ConnectionError.class) { ConnectionError ce = (ConnectionError) command; serviceLocalException(ce.getException()); } else { switch (command.getDataStructureType()) { case WireFormatInfo.DATA_STRUCTURE_TYPE: break; default: LOG.warn("Unexpected local command: {}", command); } } } catch (Throwable e) { LOG.warn("Caught an exception processing local command", e); serviceLocalException(e); } } }
serviceRemoteCommand方法
protected void serviceRemoteCommand(Command command) { //broker是否已释放 <span style="white-space:pre"> </span>if (!disposed.get()) { try { //是否需要分发消息 if (command.isMessageDispatch()) { safeWaitUntilStarted(); MessageDispatch md = (MessageDispatch) command; serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure()); ackAdvisory(md.getMessage()); } else if (command.isBrokerInfo()) { futureRemoteBrokerInfo.set((BrokerInfo) command); } else if (command.getClass() == ConnectionError.class) { ConnectionError ce = (ConnectionError) command; serviceRemoteException(ce.getException()); } else { //是否是双向发送 if (isDuplex()) { LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType()); //是否是消息 if (command.isMessage()) { final ActiveMQMessage message = (ActiveMQMessage) command; //是否符合过滤规则 if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) { serviceRemoteConsumerAdvisory(message.getDataStructure()); ackAdvisory(message); } else { //如果不能允许连接的destination则直接返回 if (!isPermissableDestination(message.getDestination(), true)) { return; } // 如果是双向传递消息的,异步发送消息后需要远端broker返回其相关信息 if (canDuplexDispatch(message)) { message.setProducerId(duplexInboundLocalProducerInfo.getProducerId()); if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() { final int correlationId = message.getCommandId(); @Override public void onCompletion(FutureResponse resp) { try { Response reply = resp.getResult(); reply.setCorrelationId(correlationId); remoteBroker.oneway(reply); //increment counter when messages are received in duplex mode networkBridgeStatistics.getReceivedCount().increment(); } catch (IOException error) { LOG.error("Exception: {} on duplex forward of: {}", error, message); serviceRemoteException(error); } } }); } else { duplexInboundLocalBroker.oneway(message); networkBridgeStatistics.getReceivedCount().increment(); } //保存当前消息 serviceInboundMessage(message); } else { if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) { Response reply = new Response(); reply.setCorrelationId(message.getCommandId()); remoteBroker.oneway(reply); } } } } else { //分别根据当前消息的类型是否是连接消息类型、session消息类型、生产者消息类型、确认消息类型、消费者消息类型、关闭消息类型进行处理 switch (command.getDataStructureType()) { case ConnectionInfo.DATA_STRUCTURE_TYPE: if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) { // end of initiating connection setup - propogate to initial connection to get mbean by clientid duplexInitiatingConnection.processAddConnection((ConnectionInfo) command); } else { localBroker.oneway(command); } break; case SessionInfo.DATA_STRUCTURE_TYPE: localBroker.oneway(command); break; case ProducerInfo.DATA_STRUCTURE_TYPE: // using duplexInboundLocalProducerInfo break; case MessageAck.DATA_STRUCTURE_TYPE: //如果是远端传来的确认消息类型消息则进行确认 MessageAck ack = (MessageAck) command; DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId()); if (localSub != null) { ack.setConsumerId(localSub.getLocalInfo().getConsumerId()); localBroker.oneway(ack); } else { LOG.warn("Matching local subscription not found for ack: {}", ack); } break; case ConsumerInfo.DATA_STRUCTURE_TYPE: //添加消费者信息 localStartedLatch.await(); if (started.get()) { addConsumerInfo((ConsumerInfo) command); } else { // received a subscription whilst stopping LOG.warn("Stopping - ignoring ConsumerInfo: {}", command); } break; case ShutdownInfo.DATA_STRUCTURE_TYPE: // initiator is shutting down, controlled case // abortive close dealt with by inactivity monitor LOG.info("Stopping network bridge on shutdown of remote broker"); serviceRemoteException(new IOException(command.toString())); break; default: LOG.debug("Ignoring remote command: {}", command); } } } else { switch (command.getDataStructureType()) { case KeepAliveInfo.DATA_STRUCTURE_TYPE: case WireFormatInfo.DATA_STRUCTURE_TYPE: case ShutdownInfo.DATA_STRUCTURE_TYPE: break; default: LOG.warn("Unexpected remote command: {}", command); } } } } catch (Throwable e) { LOG.debug("Exception processing remote command: {}", command, e); serviceRemoteException(e); } } }
小结:
NetworkBridge的主要作用就是根据本地配置讯息发送本地broker的相关讯息到远端broker,从远端broker接收消息做对应处理。
NetworkConnector.java
主要分析它的configureBridge方法
protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result) { //获取动态激活destination列表、静态激活destination列表、不激活destination列表 List<ActiveMQDestination> destsList = getDynamicallyIncludedDestinations(); ActiveMQDestination dests[] = destsList.toArray(new ActiveMQDestination[destsList.size()]); result.setDynamicallyIncludedDestinations(dests); destsList = getExcludedDestinations(); dests = destsList.toArray(new ActiveMQDestination[destsList.size()]); result.setExcludedDestinations(dests); destsList = getStaticallyIncludedDestinations(); dests = destsList.toArray(new ActiveMQDestination[destsList.size()]); result.setStaticallyIncludedDestinations(dests); if (durableDestinations != null) { HashSet<ActiveMQDestination> topics = new HashSet<ActiveMQDestination>(); for (ActiveMQDestination d : durableDestinations) { if( d.isTopic() ) { topics.add(d); } } ActiveMQDestination[] dest = new ActiveMQDestination[topics.size()]; dest = topics.toArray(dest); result.setDurableDestinations(dest); } return result; }小结
NetworkConnector主要用于管理NetworkBridge。
总结
network包中主要通过各种bridge完成了不同broker之间的消息传递,NetworkBridge的不同子类提供了不同场景下的交互实现,用户可以通过配置NetworkConnector来配置哪些消息需要立即被激活在broker之间传递或者延迟激活直到有advisor传递过来再激活,要激活的对象可以通过queue name(队列名)和topic name(主题名)来过滤。后面还会着重分析activemq的集群机制。相关文章推荐
- 服务器由于redis未授权访问漏洞被攻击
- js提示框
- cf - 670C Cinema(预处理)
- 自己动手编译Android源码
- git提取出两个版本之间的差异文件并打包 linux命令行
- Linux Mac 备忘
- C++中基本的语法规则
- Kafka副本同步机制理解
- 《面试宝典》例题之模拟火车站售票程序
- 使用Redis 建议/技巧
- 日常小结-层序遍历的实现leetcode 297
- [沈航软工教学] 3、4班最终成绩排行榜
- Date、 Calendar、SimpleDateFormat类
- android笔记之首页框架搭建
- python学习笔记
- C声明中的指针
- error C2512: “HelloWorld”: 没有合适的默认构造函数可用
- ExtJS6-项目创建
- error C2512: “HelloWorld”: 没有合适的默认构造函数可用
- 腾讯 VS 阿里 VS 携程消息中间件设计方案及思路