您的位置:首页 > 其它

MQ 文件传输

2009-12-13 19:48 190 查看
对于MQ传输文件,我采取的思路是:
A.先定义一个序列化类(赞命名为fileBean),类的属性有fileName和fileContent 两个。
B.用输入流配合 BASE64Encoder 将文件格式化为 基于BASE64Encoder 的String编码 作为文件的内容。
C.将文件名和文件内容set到fileBean的fileContent属性中。
D.调用MQ将这个Object写到远程队列中去。
E.接收方接受到消息时用readObject()方法读出,强转成fileBean
F.从fileBean中取出文件名和文件内容,将文件内容用BASE64Decoder解码
G.用文件输出流将文件写到指定的位置,到此大功告成。

-------------------------废话少说,看代码-------------------------------
文件序列化类

package com.test.mq;

import java.io.Serializable;

/**

*

* <p>

* Title: FileBean.java

* </p>

* <p>

* Description:

* </p>

* <p>

* Copyright: Copyright (c) 2009

* </p>

* <p>

* Company: shunde

* </p>

*

* @author: listening

* @create date Nov 8, 2009

*/

public class FileBean implements Serializable {

/**

*

*/

private static final long serialVersionUID = 1L;

private String fileName = "";// 文件名

private String fileContent = "";// 文件内容(BASE64Encoder编码之后的)

public String getFileName() {

return fileName;

}

public void setFileName(String fileName) {

this.fileName = fileName;

}

public String getFileContent() {

return fileContent;

}

public void setFileContent(String fileContent) {

this.fileContent = fileContent;

}

}

package com.test.mq;

import java.io.Serializable;
/**
*
* <p>
* Title: FileBean.java
* </p>
* <p>
* Description:
* </p>
* <p>
* Copyright: Copyright (c) 2009
* </p>
* <p>
* Company: shunde
* </p>
*
* @author: listening
* @create date Nov 8, 2009
*/
public class FileBean implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;

private String fileName = "";// 文件名

private String fileContent = "";// 文件内容(BASE64Encoder编码之后的)

public String getFileName() {
return fileName;
}

public void setFileName(String fileName) {
this.fileName = fileName;
}

public String getFileContent() {
return fileContent;
}

public void setFileContent(String fileContent) {
this.fileContent = fileContent;
}

}


文件发送和接受类

package com.test.mq;

import java.io.File;

import java.io.FileInputStream;

import java.io.FileOutputStream;

import java.io.InputStream;

import java.text.SimpleDateFormat;

import java.util.Date;

import sun.misc.BASE64Decoder;

import sun.misc.BASE64Encoder;

import com.ibm.mq.MQC;

import com.ibm.mq.MQEnvironment;

import com.ibm.mq.MQException;

import com.ibm.mq.MQGetMessageOptions;

import com.ibm.mq.MQMessage;

import com.ibm.mq.MQPutMessageOptions;

import com.ibm.mq.MQQueue;

import com.ibm.mq.MQQueueManager;

/**

*

* <p>

* Title: MQSendAndReceiveUtil.java

* </p>

* <p>

* Description:

* </p>

* <p>

* Copyright: Copyright (c) 2009

* </p>

* <p>

* Company: shunde

* </p>

*

* @author: listening

* @create date Nov 8, 2009

*/

public class MQSendAndReceiveUtil {

private MQQueueManager qManager;

private MQQueue queue;

private static String qmManager = "QM_00000000";// 队列管理器名称

private static String remoteQName = "RQ_88888888";// 远程队列名称

private static String localQName = "LQ_00000000";// 本地队列

private static String hostname = "192.168.1.66";// 本机名称

private static String channel = "DC.SVRCONN";// 服务器链接通道

private static int ccsid = 1381;

private static int port = 1414;

@SuppressWarnings("unchecked")

private MQSendAndReceiveUtil() {

MQEnvironment.hostname = hostname;

MQEnvironment.channel = channel;

MQEnvironment.CCSID = ccsid;

MQEnvironment.port = port;

MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,

MQC.TRANSPORT_MQSERIES);

try {

qManager = new MQQueueManager(qmManager);

} catch (MQException e) {

e.printStackTrace();

}

}

/**

*

* Description:如果队列管理器为空,建立

*

* @param:

* @return: void

* @exception Exception.

* @author listening created at Nov 8, 2009

*/

private void createConnection() {

if (qManager == null) {

new MQSendAndReceiveUtil();

}

}

/**

*

* Description:发送文件

*

* @param:String fileName -文件名

* @return: void

* @exception Exception.

* @author listening created at Nov 8, 2009

*/

public void sendFileMessage(String fileName) {

this.createConnection();

try {

int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;// 建立打开方式

queue = qManager.accessQueue(remoteQName, openOptions, null, null,

null);// 连接队列(发送时此队列为发送方的远程队列)

MQPutMessageOptions pmo = new MQPutMessageOptions();// 创建消息放入方式实例

MQMessage message = new MQMessage();// 创建MQ消息实例

FileBean file = new FileBean();// 创建FileBean对象实例并赋值

file.setFileName(fileName);

InputStream in = new FileInputStream("D://" + fileName); // 输入流读取要发送的文件

BASE64Encoder encoder = new BASE64Encoder();// 创建BASE64Encoder编码实例

byte[] data = new byte[in.available()];

in.read(data);

String content = encoder.encode(data);// 编码文件 得到String

file.setFileContent(content);

message.writeObject(file);// 将FileBean实例 file放入消息中发送

queue.put(message, pmo);

qManager.commit();

this.logInfo("文件发送成功");

} catch (Exception e) {

e.printStackTrace();

} finally {

this.closeAction();

}

}

public void receiveFileMessage() {

try {

int openOptions = MQC.MQOO_INPUT_SHARED

| MQC.MQOO_FAIL_IF_QUIESCING;// 建立队列打开方式

queue = qManager.accessQueue(localQName, openOptions, null, null,

null);// 连接队列(接收时队列名为接收方的本地队列)

MQGetMessageOptions gmo = new MQGetMessageOptions();

gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;// 同步接收

gmo.options = gmo.options + MQC.MQGMO_WAIT;// 没消息等待

gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// 停顿则失败

gmo.waitInterval = 100;// 等待间隔

MQMessage inMsg = new MQMessage();// 创建消息实例

queue.get(inMsg, gmo);// 从队列中拿出消息

FileBean fileBean = new FileBean();

fileBean = (FileBean) inMsg.readObject(); // 读取消息强转为FileBean类型

String content = fileBean.getFileContent();// 取文件内容

BASE64Decoder decoder = new BASE64Decoder();// 建立解码类实例

byte[] contentArray = decoder.decodeBuffer(content);// 解码生成byte数组

String path = "E://" + fileBean.getFileName();

FileOutputStream out = new FileOutputStream(new File(path));// 调动输出流把文件写到指定的位置

out.write(contentArray, 0, contentArray.length);

// System.out.print(fileBean.getFileName());

qManager.commit();// 提交事务

this.logInfo("文件接收成功,请注意查收");// 打印日志

} catch (Exception e) {

e.printStackTrace();

}

}

/**

*

* Description: 释放资源

*

* @param:

* @return:

* @exception Exception.

* @author listening created at Nov 8, 2009

*/

public void closeAction() {

try {

if (queue != null) {

queue.close();

queue = null;

} else if (qManager != null) {

qManager.close();

qManager = null;

}

} catch (Exception e) {

e.printStackTrace();

}

}

/**

*

* Description:打印成功日志信息

*

* @param:String message-日志内容

* @return: void

* @exception Exception.

* @author listening created at Nov 8, 2009

*/

public void logInfo(String message) {

SimpleDateFormat format = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss");

System.out.println(format.format(new Date()) + "-------" + message

+ "+-------------");

}

/**

*

* Description:main函数测试

*

* @param:

* @return: void

* @exception Exception.

* @author listening created at Nov 8, 2009

*/

public static void main(String[] args) {

new MQSendAndReceiveUtil().sendFileMessage("test.xml");

//new MQSendAndReceiveUtil().receiveFileMessage();

}

}

package com.test.mq;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.util.Date;

import sun.misc.BASE64Decoder;
import sun.misc.BASE64Encoder;

import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;

/**
*
* <p>
* Title: MQSendAndReceiveUtil.java
* </p>
* <p>
* Description:
* </p>
* <p>
* Copyright: Copyright (c) 2009
* </p>
* <p>
* Company: shunde
* </p>
*
* @author: listening
* @create date Nov 8, 2009
*/
public class MQSendAndReceiveUtil {
private MQQueueManager qManager;
private MQQueue queue;

private static String qmManager = "QM_00000000";// 队列管理器名称
private static String remoteQName = "RQ_88888888";// 远程队列名称
private static String localQName = "LQ_00000000";// 本地队列
private static String hostname = "192.168.1.66";// 本机名称
private static String channel = "DC.SVRCONN";// 服务器链接通道
private static int ccsid = 1381;
private static int port = 1414;

@SuppressWarnings("unchecked")
private MQSendAndReceiveUtil() {
MQEnvironment.hostname = hostname;
MQEnvironment.channel = channel;
MQEnvironment.CCSID = ccsid;
MQEnvironment.port = port;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
MQC.TRANSPORT_MQSERIES);

try {
qManager = new MQQueueManager(qmManager);
} catch (MQException e) {
e.printStackTrace();
}
}

/**
*
* Description:如果队列管理器为空,建立
*
* @param:
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
private void createConnection() {
if (qManager == null) {
new MQSendAndReceiveUtil();
}
}

/**
*
* Description:发送文件
*
* @param:String fileName -文件名
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public void sendFileMessage(String fileName) {
this.createConnection();
try {
int penOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;// 建立打开方式

queue = qManager.accessQueue(remoteQName, openOptions, null, null,
null);// 连接队列(发送时此队列为发送方的远程队列)

MQPutMessageOptions pmo = new MQPutMessageOptions();// 创建消息放入方式实例

MQMessage message = new MQMessage();// 创建MQ消息实例

FileBean file = new FileBean();// 创建FileBean对象实例并赋值

file.setFileName(fileName);

InputStream in = new FileInputStream("D://" + fileName); // 输入流读取要发送的文件

BASE64Encoder encoder = new BASE64Encoder();// 创建BASE64Encoder编码实例

byte[] data = new byte[in.available()];

in.read(data);

String content = encoder.encode(data);// 编码文件 得到String

file.setFileContent(content);

message.writeObject(file);// 将FileBean实例 file放入消息中发送

queue.put(message, pmo);

qManager.commit();
this.logInfo("文件发送成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
this.closeAction();
}

}

public void receiveFileMessage() {
try {
int penOptions = MQC.MQOO_INPUT_SHARED
| MQC.MQOO_FAIL_IF_QUIESCING;// 建立队列打开方式

queue = qManager.accessQueue(localQName, openOptions, null, null,
null);// 连接队列(接收时队列名为接收方的本地队列)

MQGetMessageOptions gmo = new MQGetMessageOptions();

gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;// 同步接收

gmo.options = gmo.options + MQC.MQGMO_WAIT;// 没消息等待

gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// 停顿则失败

gmo.waitInterval = 100;// 等待间隔

MQMessage inMsg = new MQMessage();// 创建消息实例

queue.get(inMsg, gmo);// 从队列中拿出消息

FileBean fileBean = new FileBean();

fileBean = (FileBean) inMsg.readObject(); // 读取消息强转为FileBean类型

String content = fileBean.getFileContent();// 取文件内容

BASE64Decoder decoder = new BASE64Decoder();// 建立解码类实例

byte[] contentArray = decoder.decodeBuffer(content);// 解码生成byte数组

String path = "E://" + fileBean.getFileName();

FileOutputStream ut = new FileOutputStream(new File(path));// 调动输出流把文件写到指定的位置

out.write(contentArray, 0, contentArray.length);

// System.out.print(fileBean.getFileName());

qManager.commit();// 提交事务

this.logInfo("文件接收成功,请注意查收");// 打印日志
} catch (Exception e) {
e.printStackTrace();
}
}

/**
*
* Description: 释放资源
*
* @param:
* @return:
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public void closeAction() {
try {
if (queue != null) {
queue.close();
queue = null;
} else if (qManager != null) {
qManager.close();
qManager = null;
}
} catch (Exception e) {
e.printStackTrace();
}

}

/**
*
* Description:打印成功日志信息
*
* @param:String message-日志内容
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public void logInfo(String message) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss");
System.out.println(format.format(new Date()) + "-------" + message
+ "+-------------");
}

/**
*
* Description:main函数测试
*
* @param:
* @return: void
* @exception Exception.
* @author listening created at Nov 8, 2009
*/
public static void main(String[] args) {
new MQSendAndReceiveUtil().sendFileMessage("test.xml");
//new MQSendAndReceiveUtil().receiveFileMessage();
}

}


至此,文件发送接收完成。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: