您的位置:首页 > 编程语言 > Java开发

Java 项目中使用 TongLink/Q 实现消息队列传输

2018-01-22 13:10 2251 查看
**

《 Java 项目中使用 TongLink/Q 实现消息队列传输 》

**

安装服务端、并配置文档

首先,我们同样是安装它的服务器端,和其他 MQ 免安装不同,我们需要根据exe安装,不断下一步就搞定了,网上很多图例,这里不是多说了。

然后就是配置文档了,东方通官网有相应软件的使用说明,包括目录的介绍,需要重点关注的是安装目录下的 bin、etc、log、sample 等目录。

bin 是启动的 bat 文件;

etc 则是系统配置文件,我们需要修改它的tlqjndi.conf和tlsys.conf文件,建议都看下,了解下中间件启动逻辑;

log 中间件运行产生的日志文件;

sample 在具体的生产环境中的一些实例;

有关配置文件改什么,需要依据 TongLing/Q 中间件的连接地址、jndi的队列名称等进行配置。注意:必须保证 服务器 和 客户端 的配置需要保持一致。

配置详情可以到官网去查看:http://www.tongtech.com/product-series.php?id=1

如下图:



里面提供了 TongWeb(应用服务器)、TongLB(均衡负载)等,应有尽有(还包括了企业服务总线 ESB) 非常实用的。

我这儿只提供与 Java 项目相关的实现代码!!!

服务端代码:

注意:发送队列名必须与 TongLing/Q 中间件启动前配置好的队列名一致。

package com.etc.send.data.utils;

import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Queue;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* TongLing/Q 服务端
* <p>
*
* @ClassName : QueueSenderUtils
*            </p>
*            <p>
* @Description : TODO
*              </p>
*              <p>
* @Author : HuaZai
*         </p>
*         <p>
* @ContactInformation : 1461522031@qq.com/huazai6789@aliyun.com
*                     </p>
*
* @Date : 2018年1月22日 上午11:24:47
* @Version : V1.0.0
*
*/
public class QueueSenderUtils {

// 定义常量
private static final String tcf = "tongtech.jms.jndi.JmsContextFactory";
private static final String remoteURL = "tlq://192.168.78.136:10024";
private static final String remoteFactory = "RemoteConnectionFactory";
private static final Logger log = LoggerFactory.getLogger(QueueSenderUtils.class);

public static void startSenderMsg() {

ConnectionFactory ConnFactory = null;
Connection conn = null;
Session session = null;
Queue queue = null;
MessageProducer mProducer = null;
TextMessage testMessage = null;

try {
Properties pro = new Properties();

pro.setProperty("java.naming.factory.initial", tcf);
pro.setProperty("java.naming.provider.url", remoteURL);

javax.naming.Context ctx = new javax.naming.InitialContext(pro);

ConnFactory = (javax.jms.ConnectionFactory) ctx.lookup(remoteFactory);
queue = (javax.jms.Queue) ctx.lookup("MyQueue");

conn = ConnFactory.createConnection();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
mProducer = session.createProducer(queue);
testMessage = session.createTextMessage("huazai");

// 开启连接,并发送消息
conn.start();
log.info(" = = = = = = = = = 开始发送消息。。。");
mProducer.send(testMessage);
log.info(" = = = = = = = = = 消息发送完成。。。");

// 获取发送消息的内容
TextMessage msg = (TextMessage) testConsumer.receive(2000);
log.info(" = = = = = = = = = 当前发送的消息内容为:" + msg.getText());

} catch (Exception e) {
log.info(" = = = = = = = = = 消息发送异常。。。" + e.toString());
e.printStackTrace();
} finally {
try {
if (session != null) {
session.close();
}
if (conn != null) {
conn.close();
}
} catch (Exception e) {
log.info(" = = = = = = = = = 关闭消息连接时异常:" + e.toString());
e.printStackTrace();
}
}

}

}


客户端代码:

注意:需要在项目的 Web.xml 中配置监听器来监听服务器保持连接状态,保证以下代码自启动,目的就是在 Tomcat 服务器启动时就能实现消息接收。

package com.etc.send.data.controller;

import java.util.Properties;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.jms.*;
import javax.naming.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.tongtech.jms.FileMessage;
import com.etc.action.MsgSenderAction;
import com.etc.action.UserAction;
import com.etc.utils.ConfigInfo;

/**
* TongLing/Q 客户端
* <p>
*
* @ClassName : QmController
*            </p>
*            <p>
* @Description : TODO
*              </p>
*              <p>
* @Author : HuaZai
*         </p>
*         <p>
* @ContactInformation : 1461522031@qq.com/huazai6789@aliyun.com
*                     </p>
*
* @Date : 2018年1月22日 上午11:57:22
* @Version : V1.0.0
*  */
public class QmCont
e01f
roller extends HttpServlet {

// 定义变量
private static final String tcf = ConfigInfo.getValue("JMSCONTEXTFACTORY");
private static final String remoteURL = ConfigInfo.getValue("REMOTEURL");
private static final String remoteFactory = ConfigInfo.getValue("REMOTEFACTORY");
private static final String QUEUENAME = ConfigInfo.getValue("QUEUENAME");
private static final Logger log = LoggerFactory.getLogger(QmController.class);

@Override
public void init() throws ServletException {

ConnectionFactory connFactory = null;
Connection conn = null;
Session session = null;
Queue queue = null;
MessageConsumer consumer = null;

try {

Properties pro = new Properties();
pro.setProperty("java.naming.factory.initial", tcf);
pro.setProperty("java.naming.provider.url", remoteURL);

Context context = new javax.naming.InitialContext(pro);
connFactory = (ConnectionFactory) context.lookup(remoteFactory);

conn = connFactory.createConnection();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = (Queue) context.lookup(QUEUENAME);
consumer = session.createConsumer(queue);
conn.start();
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
if (message != null) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
log.info(" = = = = = = = = = 收到一条Text消息:" + textMessage.getText());
log.info(" = = = = = = = = = 来自 " + QUEUENAME + " :" + textMessage);
String resuleMsg = UserAction.handle(textMessage.getText());
// 将处理结果返回给认证中心
MsgSenderAction.send(resuleMsg);
} else if (message instanceof MapMessage) {
log.info(" = = = = = = = = = 收到一条Map消息 ");
} else if (message instanceof StreamMessage) {
log.info(" = = = = = = = = = 收到一条Text消息 ");
} else if (message instanceof BytesMessage) {
log.info(" = = = = = = = = = 收到一条Bytes消息 ");
} else if (message instanceof ObjectMessage) {
log.info(" = = = = = = = = = 收到一条Object消息 ");
} else if (message instanceof FileMessage) {
log.info(" = = = = = = = = = 收到一条文件消息 ");
}
} else {
log.info(" = = = = = = = = = 消息为空 ");
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (Exception e) {
System.out.println("Exception oxxurred :" + e.toString());
log.info(" = = = = = = = = = 消息监听器异常: " + e.toString());
e.printStackTrace();
}
}
}


Web.xml

在项目的核心配置文件 web.xml 中新增节点 “ servlet ”;

<servlet>
<servlet-name>aos</servlet-name>
<servlet-class>com.etc.send.data.controller.QmController</servlet-class>
<load-on-startup>2</load-on-startup>
</servlet>


新增 servlet 节点后,在项目启动的时候,就可以监听 TongLing/Q ,并且实现消息接收了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息