您的位置:首页 > 运维架构 > Apache

Apache Camel - 13 - JMS组件(ActiveMQ)

2018-01-24 23:02 267 查看
Apache Camel JMS DSL

用于配置JMS组件的通用URI选项

Option
Default value
Description
autoStartup
true
Controls whether consumers start listening right after creation. If set to false, you’ll need to invoke the start() method on the consumer manually at a later time.
控制消费者是否在启动之后立即开始监听。如果设置为false,则需要稍后手动调用消费者的启动方法。(??不是很明白)
clientId
null
Sets the JMS client ID, which must be unique among all connections to the JMS broker. The client ID set in the ConnectionFactory overrides this one if set.
设置JMS客户端ID,该ID在与JMS代理的所有的连接中必须是唯一的。如果设置,ConnectionFactory
中设置的客户端ID将覆盖此设置(??)。
concurrentConsumers
1
Sets the number of consumer threads to use. It’s a good idea to increase this for high-volume queues, but it’s not advisable to use more than one concurrent consumer for JMS topics,
because this will result in multiple copies of the same message.
设置要使用的使用者线程的数量,为大容量队列增加这个值是个好注意。但这不是JMS主题,因为这会导致同一个消息的多个副本。
disableReplyTo
false
Specifies whether Camel should ignore the JMSReplyTo header in any messages or not. Set this if you don’t want Camel to send a reply back to the destination specified in the JMSReplyTo
header.
指定Camel是否应忽略任何消息中的JMSReplyTo头。如果你不希望Camel发送应答到JMSReplyTo头中指定的目的地,请设置此项。
durableSubscriptionName
null
Specifies the name of the durable topic subscription. If clientId is also set, the topic subscription is made durable automatically.
指定持久主题订阅的名称。如果还设置了客户端ID,则主题订阅自动失效。
maxConcurrentConsumers
1
Sets the maximum number of consumer threads to use. If this value is higher than concurrentConsumers, new consumers are started dynamically as load demands. If load drops down, these
extra consumers will be freed and the number of consumers will be equal to concurrentConsumers again. Increasing this value isn’t advisable when using topics.
设置要使用的最大消费者线程数。如果此值高于concurrentConsumers,则新的使用者将根据负载需求动态启动。
如果负载下降,这些额外的消费者将被自动释放,并且消费者的数量将再次等于并发消费者。
使用主题时,不建议增加此值。
replyTo
null
Sets the destination that the reply is sent to. This overrides the JMSReplyTo header in the message. By setting this, Camel will use a fixed reply queue. By default,Camel will use
a temporary reply queue.
设置回复发送到的目的地,这覆盖了消息中心的JMSReplyTo头。通过设置这个,Camel将使用固定的答复队列。
默认情况下,Camel将使用临时答复队列。
requestTimeout
20000
Specifies the time in milliseconds before Camel will timeout sending a message.
指定Camel超时发送消息前的时间(毫秒单位)
selector
null
Sets the JMS message selector expression. Only messages passing this predicate will be consumed.
设置JMS消息选择器表达式,只有通过这个谓词的消息才会消耗。
timeToLive
null
When sending messages, sets the amount of time the message should live. After this time expires, the JMS provider may discard the message.
发送消息时,设置消息的生存时间。在这段时间之后,JMS提供者可能会丢弃该消息。
transacted
false
Enables transacted sending and receiving of messages in InOnly mode.
Apache Camel之JMS(ActiveMQ)路由

WHAT IS JMS?(什么是JMS?)

JMS (Java Message Service) is a Java API that allows you to create, send, receive, and read messages. It also mandates that messaging is asynchronous and has specific elements of reliability, like guaranteed and once-and-only-once delivery. JMS is the de
facto messaging solution in the Java community.

In JMS, message consumers and producers talk to one another through an intermediary—a JMS destination. As shown in figure 2.4, a destination can be either a queue or a topic. Queues are strictly point-to-point, where each message has only one consumer. Topics
operate on a publish/subscribe scheme; a single message may be delivered to many consumers if they have subscribed to the topic.

JMS also provides a ConnectionFactory that clients (like Camel) can use to create a connection with a JMS provider. JMS providers are usually referred to as brokers because they manage the communication between a message producer and a message consumer.

JMS(Java消息服务)是一种Java API,允许您创建,发送,接收和读取消息。 它还要求消息传递是异步的,并且具有特定的可靠性要素,例如有保证和一次又一次的传递。

JMS是Java社区中事实上的消息传递解决方案。

在JMS中,消息消费者和生产者通过中介(JMS目标)相互通信。 如图2.4所示,目标可以是队列或主题。 队列是严格的点对点,每个消息只有一个消费者。 主题在发布/订阅计划上运行; 如果他们订阅了该主题,则可以将单个消息传递给许多消费者。

JMS还提供了一个ConnectionFactory,客户端(如Camel)可以使用它来创建与JMS提供者的连接。 JMS提供程序通常被称为代理程序,因为它们管理消息生产者和消息使用者之间的通信。

HOW TO CONFIGURE CAMEL TO USE A JMS PROVIDER

(如何配置Camel来使用JMS提供程序)

To connect Camel to a specific JMS provider, you need to configure Camel’s JMS component with an appropriate ConnectionFactory.

Apache ActiveMQ is one of the most popular open source JMS providers, and it’s the primary JMS broker that the Camel team uses to test the JMS component. As such, we’ll be using it to demonstrate JMS concepts within the book. For more information on Apache
ActiveMQ, we recommend ActiveMQ in Action by Bruce Snyder, Dejan Bosanac, and Rob Davies, available from Manning Publications.

要将Camel连接到特定的JMS提供程序,您需要使用适当的ConnectionFactory配置Camel的JMS组件。

Apache ActiveMQ是最受欢迎的开源JMS提供者之一,也是Camel团队用来测试JMS组件的主要JMS代理。 因此,我们将使用它来展示本书中的JMS概念。

ApacheActiveMQ的更多信息,我们推荐由Manning Publications出版的Bruce Snyder,Dejan Bosanac和Rob Davies的ActiveMQ。



There are two types of JMS destinations: queues and topics. The queue is a point-to-point channel, where each message has only one recipient. A topic delivers a copy of the message to all clients who have subscribed to receive it.

有两种类型的JMS目标:队列和主题。 队列是一个点对点的通道,每个消息只有一个收件人。 主题将消息的副本传递给已订阅接收消息的所有客户端。

//书中详细的内容这里不列举了,大家翻翻书就好...

我将整理好的代码,上传了,大家可以下载下来,跑跑看...

Apache Camel JMS Apache ActiveMQ 样例Demo

ActiveMQ我本地安装的是Windows版本的,大家可以自己去官网上下载安装。

进入到\bin\win64目录之后,双击运行activemq.bat,就可以了...



好,下面我们来看下代码:

使用Apache Camel向mq发送消息

package com.camel.activemq;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.log4j.PropertyConfigurator;

/**
* 从本地目录读取文件,然后发送至mq队列中
*
* @author CYX
* @time 2017年12月20日下午2:16:40
*/
public class ActiveMQCamel {

private static String user = ActiveMQConnection.DEFAULT_USER;

private static String password = ActiveMQConnection.DEFAULT_PASSWORD;

private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

public static void main(String[] args) throws Exception {

PropertyConfigurator.configure("./conf/log4j.properties");
PropertyConfigurator.configureAndWatch("./conf/log4j.properties", 1000);

CamelContext context = new DefaultCamelContext();

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);

context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

System.out.println(url + " " + user + " " + password);

context.addRoutes(new RouteBuilder() {

@Override
public void configure() throws Exception {
from("file:/temp").to("jms:queue:hoo.mq.queue");
}
});

context.start();

// 通用没有具体业务意义的代码,只是为了保证主线程不退出
synchronized (ActiveMQCamel.class) {
ActiveMQCamel.class.wait();
}

}

}


消息队列接收代码:

package com.test.client.avtivemq;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 消息接受者
*
* @author CYX
* @time 2016年12月13日下午4:27:31
*/
public class MessageReceiver {

/** tcp地址 */
public static final String BROKER_URL = "tcp://localhost:61616";
/** 目标 */
public static final String DESTINATION = "hoo.mq.queue";

public static void main(String[] args) throws Exception {
MessageReceiver.run();
}

public static void run() throws Exception {

Connection connection = null;
Session session = null;

try {
// 创建链接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通过工厂创建一个连接
connection = factory.createConnection();
// 启动链接
connection.start();
// 创建一个session会话.
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列.
Destination destination = session.createQueue(DESTINATION);
// 创建消息制作者.
MessageConsumer consumer = session.createConsumer(destination);

consumer.setMessageListener(new MessageListener() {

@Override
public void onMessage(Message message) {

try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
System.out.println("Received Text message : " + txtMsg.getText());
} else if (message != null) {
BytesMessage bytesMsg = (BytesMessage) message;
byte[] bytes = new byte[(int) bytesMsg.getBodyLength()];
bytesMsg.readBytes(bytes);
System.out.println("Received byte message: " + new String(bytes));
}
} catch (JMSException e) {
e.printStackTrace();
}

}
});

// 通用没有具体业务意义的代码,只是为了保证主线程不退出
synchronized (MessageReceiver.class) {
MessageReceiver.class.wait();
}

} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
}

}

}


首先,我们不要再本地目录下放任何文件;
接着,我们启动程序,Camel程序和消息接收程序都启动。

接着,我们将一个测试文件传入本地temp目录。

我们看Camel的打印日志:



再来看消息接收的程序日志



恩,没问题...

Apache Camel从MQ消息队列中接收数据

代码其实和上面差不多,稍微做了一丢丢的改动...

package com.camel.activemq2;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.log4j.PropertyConfigurator;

/**
* 从mq中读取数据
*
* @author CYX
* @time 2017年12月20日下午2:16:40
*/
public class ActiveMQCamel {

private static String user = ActiveMQConnection.DEFAULT_USER;

private static String password = ActiveMQConnection.DEFAULT_PASSWORD;

private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;

public static void main(String[] args) throws Exception {

PropertyConfigurator.configure("./conf/log4j.properties");
PropertyConfigurator.configureAndWatch("./conf/log4j.properties", 1000);

CamelContext context = new DefaultCamelContext();

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);

context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

System.out.println(url + " " + user + " " + password);

context.addRoutes(new RouteBuilder() {

@Override
public void configure() throws Exception {
from("jms:queue:hoo.mq.queue")
.to("file:/temp")
.to("log:activemqcamel?showExchangeId=true");
}
});

context.start();

// 通用没有具体业务意义的代码,只是为了保证主线程不退出
synchronized (ActiveMQCamel.class) {
ActiveMQCamel.class.wait();
}

}

}


上面的代码中,我们设置路由是这样的,首先从mq中读取消息,然后写入到本地文件,接着用日志打印出来。

向mq中发送数据代码

package com.camel.activemq2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
* 消息发送者<br>
* 使用JMS方式发送接收消息<br>
* @author CYX
* @time 2016年12月13日下午4:13:58
*/
public class MessageSender {

/** 发送次数 */
public static final int SEND_NUM = 5;
/** tcp地址 */
public static final String BROKER_URL = "tcp://localhost:61616";
/** 目标 */
public static final String DESRINATION = "hoo.mq.queue";

public static void main(String[] args) {
try {
MessageSender.run();
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 发送消息
*
* @param session
* @param producer
* @throws Exception
*/
public static void sendMessage(Session session, MessageProducer producer) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
String message = "我是消息 : " + (i + 1) + "....";
TextMessage text = session.createTextMessage(message);

System.out.println("message : " + message);
producer.send(text);

}
}

public static void run() throws Exception {

Connection connection = null;
Session session = null;

try {
// 创建链接工厂.
ConnectionFactory factory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通过工厂创建一个链接.
connection = factory.createConnection();
// 启动链接.
connection.start();
// 创建一个session会话.
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列.
Destination destination = session.createQueue(DESRINATION);
// 创建消息制作者.
MessageProducer producer = session.createProducer(destination);
// 设置持久化模式
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, producer);
// 提交会话.
session.commit();

} catch (Exception e) {
e.printStackTrace();
} finally {
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}

}


我们先启动Camel程序,接着再启动消息发送程序(没有先后顺序)

我们看下消息发送程序的日志:



在看camel程序



再看temp目录:



OK...没问题...双击666....
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: