您的位置:首页 > 其它

ActiveMQ in Action(2)

2010-06-22 23:18 204 查看
2.2 Transport

ActiveMQ目前支持的transport有:VM Transport、TCP
Transport、SSL Transport、Peer Transport、UDP Transport、Multicast
Transport、HTTP and HTTPS Transport、Failover Transport、Fanout
Transport、Discovery Transport、ZeroConf
Transport等。以下简单介绍其中的几种,更多请参考Apache官方文档。



2.2.1 VM Transport

VM
transport允许在VM内部通信,从而避免了网络传输的开销。这时候采用的连接不是socket连接,而是直接地方法调用。 第一个创建VM
连接的客户会启动一个embed VM broker,接下来所有使用相同的broker
name的VM连接都会使用这个broker。当这个broker上所有的连接都关闭的时候,这个broker也会自动关闭。


以下是配置语法:

vm://brokerName?transportOptions

例如:vm://broker1?marshal=false&broker.persistent=false

Transport Options的可选值如下:

Option Name
Default Value
Description
MarshalfalseIf true, forces each command sent over the transport to be marshlled
and unmarshlled using a WireFormat
wireFormatdefaultThe name of the WireFormat to use
wireFormat.*All the properties with this prefix are used to configure the
wireFormat
createtrueIf the broker should be created on demand if it does not allready
exist. Only supported in ActiveMQ 4.1
broker.*All the properties with this prefix are used to configure the
broker. See Configuring Wire Formats for more information
以下是高级配置语法:

vm:(broker:(tcp://localhost)?brokerOptions)?transportOptions

vm:broker:(tcp://localhost)?brokerOptions


例如:vm:(broker:(tcp://localhost:6000)?persistent=false)?marshal=false

Transport Options的可选值如下:

Option Name
Default Value
Description
marshalfalseIf true, forces each command sent over the transport to be marshlled
and unmarshlled using a WireFormat
wireFormatdefaultThe name of the WireFormat to use
wireFormat.*All the properties with this prefix are used to configure the
wireFormat
使用配置文件的配置语法:


vm://localhost?brokerConfig=xbean:activemq.xml

例如:vm://
localhost?brokerConfig=xbean:com/test/activemq.xml



使用Spring的配置:

Xml代码


<
bean



id

=
"broker"


class

=
"org.apache.activemq.xbean.BrokerFactoryBean"

>





<
property



name

=
"config"


value

=
"classpath:org/apache/activemq/xbean/activemq.xml"


/>





<
property



name

=
"start"


value

=
"true"


/>




</
bean
>






<
bean



id

=
"connectionFactory"


class

=
"org.apache.activemq.ActiveMQConnectionFactory"


depends-on

=
"broker"

>





<
property



name

=
"brokerURL"


value

=
"vm://localhost"

/>




</
bean
>




Xml代码

<
bean

id
=
"broker"

class
=
"org.apache.activemq.xbean.BrokerFactoryBean"
>


<
property

name
=
"config"

value
=
"classpath:org/apache/activemq/xbean/activemq.xml"

/>


<
property

name
=
"start"

value
=
"true"

/>


</
bean
>




<
bean

id
=
"connectionFactory"

class
=
"org.apache.activemq.ActiveMQConnectionFactory"

depends-on
=
"broker"
>


<
property

name
=
"brokerURL"

value
=
"vm://localhost"
/>


</
bean
>


<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
  <property name="config" value="classpath:org/apache/activemq/xbean/activemq.xml" />
  <property name="start" value="true" />
</bean>

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">
  <property name="brokerURL" value="vm://localhost"/>
</bean>


如果persistent是true,那么ActiveMQ会在当前目录下创建一个缺省值是activemq-data的目录用于持久化保存数据。需要注
意的是,如果程序中启动了多个不同名字的VM broker,那么可能会有如下警告:Failed to start jmx connector:
Cannot bind to URL [rmi://localhost:1099/jmxrmi]:
javax.naming.NameAlreadyBoundException…可以通过在transportOptions中追加
broker.useJmx=false来禁用JMX来避免这个警告。



2.2.2 TCP Transport

TCP transport 允许客户端通过TCP
socket连接到远程的broker。以下是配置语法:

tcp://hostname:port?transportOptions


Transport Options的可选值如下:

Option Name
Default Value
Description
minmumWireFormatVersion0The minimum version wireformat that is allowed
tracefalseCauses all commands that are sent over the transport to be logged
useLocalHosttrueWhen true, it causes the local machines name to resolve to
"localhost".
socketBufferSize64 * 1024Sets the socket buffer size in bytes
soTimeout0sets the socket timeout in milliseconds
connectionTimeout30000A non-zero value specifies the connection timeout in milliseconds. A
zero value means wait forever for the connection to be established.
Negative values are ignored.
wireFormatdefaultThe name of the WireFormat to use
wireFormat.*All the properties with this prefix are used to configure the
wireFormat. See Configuring Wire Formats for more information
例如:tcp://localhost:61616?trace=false



2.2.3 Failover Transport

Failover
Transport是一种重新连接的机制,它工作于其它transport的上层,用于建立可靠的传输。它的配置语法允许制定任意多个复合的URI。
Failover
transport会自动选择其中的一个URI来尝试建立连接。如果没有成功,那么会选择一个其它的URI来建立一个新的连接。以下是配置语法:


failover:(uri1,...,uriN)?transportOptions

failover:uri1,...,uriN


Transport Options的可选值如下:

Option Name
D
efault Value
Description
initialReconnectDelay10How long to wait before the first reconnect attempt (in ms)
maxReconnectDelay30000The maximum amount of time we ever wait between reconnect attempts
(in ms)
useExponentialBackOfftrueShould an exponential backoff be used between reconnect attempts
backOffMultiplier2The exponent used in the exponential backoff attempts
maxReconnectAttempts0If not 0, then this is the maximum number of reconnect attempts
before an error is sent back to the client
randomizetrueuse a random algorithm to choose the URI to use for reconnect from
the list provided
backupfalseinitialize and hold a second transport connection - to enable fast
failover
例如:failover:(tcp://localhost:61616,tcp:
//remotehost:61616)?initialReconnectDelay=100



2.2.4 Discovery transport

Discovery
transport是可靠的tranport。它使用Discovery transport来定位用来连接的URI列表。以下是配置语法:


discovery:(discoveryAgentURI)?transportOptions


discovery:discoveryAgentURI

Transport Options的可选值如下:

Option Name
Default Value
Description
initialReconnectDelay10How long to wait before the first reconnect attempt
maxReconnectDelay30000The maximum amount of time we ever wait between reconnect attempts
useExponentialBackOfftrueShould an exponential backoff be used btween reconnect attempts
backOffMultiplier2The exponent used in the exponential backoff attempts
maxReconnectAttempts0If not 0, then this is the maximum number of reconnect attempts
before an error is sent back to the client
例如:discovery:(multicast://default)?initialReconnectDelay=100


为了使用Discovery来发现broker,需要为broker启用discovery agent。 以下是XML配置文件中的一个例子:

Xml代码


<
broker



name

=
"foo"

>





<
transportConnectors
>





<
transportConnector



uri

=
"tcp://localhost:0"


discoveryUri

=
"multicast://default"

/>





</
transportConnectors
>




...

</
broker
>




Xml代码

<
broker

name
=
"foo"
>


<
transportConnectors
>


<
transportConnector

uri
=
"tcp://localhost:0"

discoveryUri
=
"multicast://default"
/>


</
transportConnectors
>


...

</
broker
>


<broker name="foo">
   <transportConnectors>
      <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>
    </transportConnectors>
    ...
</broker>

在使用Failover Transport或Discovery
transport等能够自动重连的transport的时候,需要注意的是:设想有两个broker,它们都启用AMQ Message
Store作为持久化存储,有一个producer和一个consumer连接到某个queue。当因其中一个broker失效时而切换到另一个
broker的时候,如果失效的broker的queue中还有未被consumer消费的消息,那么这个queue里的消息仍然滞留在失效broker
的中,直到失效的broker被修复并重新切换回这个被修复的broker后,之前被保留的消息才会被consumer消费掉。如果被处理的消息有时序限
制,那么应用程序就需要处理这个问题。另外也可以通过ActiveMQ集群来解决这个问题。

在transport重连的时候,可以在connection上注册TransportListener来获得回调,例如:

Java代码


(ActiveMQConnection)connection).addTransportListener(
new


TransportListener() {



public



void


onCommand(Object cmd) {


}




public



void


onException(IOException exp) {


}




public



void


transportInterupted() {



// The transport has suffered an interruption from which it hopes to recover.




}




public



void


transportResumed() {



// The transport has resumed after an interruption.



}

});
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: