您的位置:首页 > 其它

mq的链接 消息读取与存放

2016-01-06 09:58 459 查看
上一节讲了 MQ的创建,这节主要讲解的是MQ的链接方式,我们在回顾以前讲解的一下东西,JMS(Java Message Services( JMS)是消息发送的标准 API为 Java 程序与对消息发送系统对象进行各种操作的消息发送系统进行互动提供了一个常见的模型,有自己本身的方法去链接MQ)
MQ (消息中间键,先进先出,通常存储以队列形式,主题不经常用到,这里提到的mq主要是ibm开发的,因此有自己独有的包去处理发送接收消息)

下边主要提供实现方法JMS和MQ两种,根据代码来扩展我们的知识点和知识面,下边主要根据代码的实现去扩展我们的知识点和知识面

JMS:

public class MsgQueueSender {

/**
* @功能:JMS中实现点对点消息服务--发送消息
* @作者:
* @日期:2012-10-17
*/

private MessageProducer sender;
private TextMessage msg;

public MsgQueueSender(String[] argv) throws NamingException, JMSException {
/* 初始化上下文对象 */
String url = "file:/F:/bindings/";
Properties p = new Properties();
p.put(Context.INITIAL_CONTEXT_FACTORY,
"com.sun.jndi.fscontext.RefFSContextFactory");
p.put(Context.PROVIDER_URL, url);
Context ctx = new InitialContext(p);

/* 创建一个连接工厂 */
ConnectionFactory qConFactory =(ConnectionFactory) ctx.lookup("TestFactory");

/* 创建连接 */
Connection qCon = qConFactory.createConnection();

/* 创建一个队列 */
Queue messageQueue = (Queue) ctx.lookup("Q.TEST.IN");

/* 创建一个会话 */
Session session  = qCon.createSession(false, Session.AUTO_ACKNOWLEDGE);

/* 创建一个发送者 */
sender = session.createProducer(messageQueue);

/* 创建一个消息 */
msg = session.createTextMessage();

}

public void runClient(String str) throws JMSException {
/* 设置消息,并发送 */
msg.setText("Hello");
sender.send(msg);
msg.setText("Welcome to JMS");
sender.send(msg);
msg.setText(str);
sender.send(msg);
}

public static void main(String[] args) throws Exception {
try {
MsgQueueSender mqs = new MsgQueueSender(args);
mqs.runClient("aaa");

} catch (NamingException e) {
System.err.println("");
System.err.println("**请确保已经正确地设置JMS服务器。在运行之前必须配置JMS服务器和正确的JMS目的。");
System.err.println("");
throw e;

}
}

}


先来个最简单的收发消息

MQ

public class MQSender{
private MQQueueManager qMgr;
private void getConnMQmanager() {
MQEnvironment.hostname = "127.0.0.1";// MQ服务器IP
MQEnvironment.channel = "CHAN_TEST";     // 队列管理器对应的服务器连接通道
MQEnvironment.CCSID = 1381;            // 字符编码
MQEnvironment.port = 1415;     // 队列管理器的端口号
try {
qMgr = new MQQueueManager("TEST");// 队列管理器名称
} catch (MQException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

private void closeConnMQmanager() {
if (qMgr != null) {
try {
qMgr.close();
} catch (MQException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

public void sendMsg(String msgStr){
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF|MQC.MQOO_OUTPUT|MQC.MQOO_INQUIRE;
MQQueue queue = null;
try {
//建立Q1通道的连接
queue = qMgr.accessQueue("Q.TEST.IN", openOptions, null, null,null);
MQMessage msg = new MQMessage();// 要写入队列的消息
msg.format = MQC.MQFMT_STRING;
msg.characterSet = 1208;
msg.writeObject(msgStr); //将消息写入消息对象中
MQPutMessageOptions pmo = new MQPutMessageOptions();
// msg.expiry = -1;    // 设置消息用不过期
queue.put(msg, pmo);// 将消息放入队列
} catch (MQException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
if(queue!=null){
try {
queue.close();
} catch (MQException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}

public static void main(String[] args) throws JMSException,
InterruptedException {
MQSender mm = new MQSender();
mm.getConnMQmanager();
try {
for(int i=0;i<1;i++){
mm.sendMsg("AAAAAAAAAAAAAAAAAAAA");
System.out.println("写入数据"+"  "+i);
}
} catch (Exception e) {
e.printStackTrace();
}
mm.closeConnMQmanager();
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: