您的位置:首页 > 其它

如何使用MQ标头动态确定数据处理程序的行为?

2012-08-02 22:35 302 查看
本文的目标读者是集成开发人员,其角色涉及使用服务组件体系结构(Service Component Architecture,SCA)集成MQ消息传递引擎。本文将描述如何使用数据处理程序基于MQ标头更改转换逻辑,同时确保处理程序保持协议独立性。涵盖的主题包括:

  数据处理程序的解释

  从数据处理程序访问MQ标头

  基于标头改变数据处理程序的行为

  确保数据处理程序保持协议独立性

  在阅读本文之后,您将能够创作基于MQ标头动态地转换数据的可重用数据处理程序。

  数据处理程序

  数据处理程序负责将操作的输入和输出类型转换为有线使用的格式,反之亦然。以前,MQ绑定使用了数据绑定执行此功能。数据绑定具有访问有线格式(如MQ消息)的权限,并可以直接对消息进行读取或写入。在JMS绑定上无法使用这种连接到特定绑定类型(例如MQ数据绑定)的每个数据绑定。

  数据处理程序提供了使用规范格式转换数据的通用方法。transform方法用于读取数据,并传递以下类型之一来读取输入:

  InputStream-用于读取字节

  Reader-用于读取文本

  Object-用于读取Java对象

  transformInto方法用于写入数据,并传入以下类型之一来写入输出:

  OutputStream-用于写入字节

  Writer-用于写入文本

  Object-用于写入Java对象

  这样可使消息的有线格式对数据处理程序透明,允许任何受支持的绑定重用它们。

  数据处理程序和消息标头

  当创作数据绑定时,会将消息标头传入读或写方法,并允许协议标头中的字段确定转换逻辑。不过,对于方法的输入参数不包括协议标头的数据处理程序,我们应如何访问它们?

  在WebSphere Enterprise Service Bus (WESB) V6.2中,协议标头存储在名为ContextService的存储库中。ContextService通过使用普通SPI进行访问,并且可以从任何Java代码引用,其中包括POJO、函数选择器或数据处理程序。清单 1 显示了从ContextService获取MQHeader的示例代码。

  清单 1. 使用上下文服务访问协议标头

以下是引用片段:

HeadersType headers = ContextService.INSTANCE.getHeaders();

MQHeaderType mqHeader = headers.getMQHeader();
 

  使用数据处理程序中的MQHeader

  现在,我们已了解了如何访问数据处理程序中的MQ Header,让我们基于这些字段之一制定转换决策。MQMD的格式字段通常用于确定消息正文格式。消息正文的格式、编码和编码字符集存储在MQControl结构中,并包含在MQ Header中。清单 2 中所示的代码说明了如何根据MQMD中格式字段的值修改转换逻辑。

  清单 2. 基于MQMD格式字段更改行为

以下是引用片段:

package com.ibm.custom.datahandlers;

import java.io.IOException;

import java.io.InputStream;

import java.io.OutputStream;

import java.util.Map;

import com.ibm.websphere.bo.BOFactory;

import com.ibm.websphere.bo.BOXMLDocument;

import com.ibm.websphere.bo.BOXMLSerializer;

import com.ibm.websphere.sca.ServiceManager;

import com.ibm.websphere.sca.mq.structures.MQControl;

import com.ibm.websphere.sibx.smobo.HeadersType;

import com.ibm.websphere.sibx.smobo.MQHeaderType;

import com.ibm.wsspi.session.ContextService;

import commonj.connector.runtime.DataHandler;

import commonj.connector.runtime.DataHandlerException;

import commonj.sdo.DataObject;

@SuppressWarnings("serial")

public class MQHeaderDataHandler implements DataHandler {

Map bindingContext;

String delimiter = "%";

/**

* Reads a DataObject from an InputStream (Bytes support only)

*/

public Object transform(Object source, Class target, Object options)

throws DataHandlerException {

System.out.println("MyDataHandler: Calling

transform(" + source + "," + target + "," + options + ")");

DataObject output = null;

InputStream is = (InputStream) source;

//Handle InputStreams and DataObjects

if (source instanceof InputStream &&

target.getName().equals("commonj.sdo.DataObject")) {

if (("MQXML").equals(getMQMessageFormat())) {

//If the format field of the incoming message

is XML use BOXMLSerializer

ServiceManager serviceMgr = new ServiceManager();

BOXMLSerializer xmlSerializer =

(BOXMLSerializer) serviceMgr

.locateService

("com/ibm/websphere/bo/BOXMLSerializer");

try {

BOXMLDocument xmlDoc = xmlSerializer.

readXMLDocumentWithOptions

(is , options);

output = xmlDoc.getDataObject();

((InputStream)source).close();

} catch (IOException e) {

throw new

DataHandlerException

("Exception reading DataObject "

+ e.getLocalizedMessage());

}

} else {

//Read delimited data

byte[] ch = new byte[2];

StringBuffer buffer = new StringBuffer();

try {

int len = is.read(ch);

while (len != -1) {

buffer.append(new String(ch), 0, len);

len = is.read(ch);

}

} catch (IOException e) {

throw new DataHandlerException(e);

}

//Split the values using the delimiter

String[] values = buffer.toString().split(delimiter);

System.out.println("Found name: "+values[0]+" and id: "

+values[1]);

//Build the CustomerType

ServiceManager serviceManager = new ServiceManager();

BOFactory bofactory =

(BOFactory) serviceManager.locateService

("com/ibm/websphere/bo/BOFactory");

output =

bofactory.create("http://

MQHeaderDataHandlerModule", "CustomerType");

output.setString("name", values[0]);

output.setInt("id", Integer.parseInt(values[1]));

}

} else {

throw new

DataHandlerException("Source type " +

source.getClass().getName() + " or Target type "

+ target.getName() + " unsupported by

MyDataHandler");

}

return output;

}

/**

* Writes the DataObject out to a OutputStream (Bytes support only)

*/

public void transformInto(Object source, Object target, Object options)

throws DataHandlerException {

System.out.println("Calling

transformInto(" + source + "," + target + "," + options + ")");

if (source instanceof DataObject && target instanceof OutputStream) {

OutputStream os = (OutputStream) target;

DataObject bo = (DataObject) source;

if (("MQXML").equals(getMQMessageFormat())) {

//If the format field of the outgoing message is XML use

BOXMLSerializer

ServiceManager serviceMgr = new ServiceManager();

BOXMLSerializer xmlSerializer = (BOXMLSerializer)

serviceMgr.locateService("com/ibm/websphere/

bo/BOXMLSerializer");

try {

xmlSerializer.writeDataObject(bo, bo.getType().

getURI(), bo.getType().getName(), os);

} catch (IOException e) {

throw new

DataHandlerException("Exception writing

DataObject " + e.getLocalizedMessage());

}

} else {

//Write delimited data

String name = bo.getString("name");

int id = bo.getInt("id");

String message = name+delimiter+id;

try {

os.write(message.getBytes());

} catch (IOException e) {

throw new

DataHandlerException("Exception writing

DataObject " + e.getLocalizedMessage());

}

}

} else {

throw new

DataHandlerException("Source type " + source.getClass()

.getName() + " or Target type "

+ target.getClass().getName() +

" unsupported by MyDataHandler");

}

}

/**

* Returns the format of the MQHeader or null if there is no MQHeader

* @return

*/

private String getMQMessageFormat() {

String format = null;

HeadersType headers = ContextService.INSTANCE.getHeaders();

MQHeaderType mqHeader = null;

if(headers!= null){

mqHeader = headers.getMQHeader();

if (mqHeader != null) {

MQControl mqControl = mqHeader.getControl();

if (mqControl != null) {

format = mqControl.getFormat();

if (format != null) {

format = format.trim();

}

}

}

}

return format;

}

public void setBindingContext(Map context) {

bindingContext = context;

}

}
  此处理程序使用MQHeader格式字段,如果将其设置为MQXML,则会将消息正文视为XML,否则会将消息正文视为带分隔符的数据。如果将此数据处理程序与非MQ绑定一起使用,则不能在 ContextService 中定义 MQHeader。在这种情况下,getMQMessageFormat()方法将返回null,并导致将正文视为带分隔符的数据,因此,数据处理程序应保持协议独立性。

  测试数据处理程序

  首先,使用MQ绑定创建一个简单的模块。假定已安装WebSphere MQ,并且可以在WebSphere ESB中访问它。

  在WID中创建名为MQHeaderDataHandlerModule的新模块

  在MQHeaderDataHandlerModule项目中,创建名为CustomerType的新数据类型



图 1. CustomerType Business对象

添加名为name的String字段和名为id的int字段

  保存并关闭新数据类型

  创建名为CustomerInterface的新接口

  添加名为processCustomer的单向操作,它可以接受CustomerType作为输入参数



图 2. Customer接口

将新的导出添加到组装图

  将CustomerInterface添加到导出

  在导出上配置MQ绑定

  设置请求队列管理器名称

  设置接收目标

  为WebSphere MQ队列管理器设置适当的连接详细信息

  要获取缺省的请求数据格式,请单击select

  选择Select your custom data format transformation from the workspace,并单击select

  选择MQHeaderDataHandler,并单击ok

  勾选Add custom class to binding registry,并单击next(此操作可以在现有数据格式列表中显示)

  输入名称MQHeaderDataHandler

  添加描述

  选择它适用的绑定类型,现在勾选MQ和JMS



图 3. 设置数据格式

在导入上,按照上述同一数据格式配置MQ绑定

  为确保正确地创建DataObject,我们将使用Java组件将其输出,因此将一个数据对象添加到画布

  将Java组件连接到导出和导入,您的模块现在应该与图 4 所示类似



图 4. MQHeaderDataHandlerModule

  双击Java组件创建一个实现

  将processCustomer的实现替换为清单 3 中的代码

  清单 3. 打印数据对象

以下是引用片段:

BOFactory factoryService = (BOFactory) new

ServiceManager().locateService("com/ibm/websphere/bo/BOFactory");

BOXMLSerializer xmlSerializerService =(BOXMLSerializer) new

ServiceManager().locateService("com/ibm/websphere/bo/BOXMLSerializer");

try {

xmlSerializerService.writeDataObject(input1, input1.getType().getURI(),

input1.getType().getName(), System.out);

} catch (IOException e) {

e.printStackTrace();

}

locateService_CustomerInterfacePartner().processCustomer(input1);
  保存Java组件

  保存模块

  运行模块

  为测试模块,我们将使用RFHUtil,它允许我们创建MQ消息,并将其放入导出接收目标。然后,我们可以使用WID中的服务器日志视图查看由Java组件输出的DataObject,并再次使用RFHUtil查看导入输出的消息。

  在MQHeaderDataHandlerModule项目中创建一个名为CustomerType.xml的文件

  将内容设置为清单 4 中显示的 XML

  清单 4. CustomerType XML数据

以下是引用片段:

<?xml version="1.0" encoding="UTF-8"?>

<p:CustomerType xmlns:p="http://MQHeaderDataHandlerModule"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://MQHeaderDataHandlerModule CustomerType.xsd ">

<name>Tim</name>

<id>114923</id>

</p:CustomerType>

  在MQHeaderDataHandlerModule项目中创建一个名为CustomerType.txt的文件

将内容设置为清单 5 中显示的XML

  清单 5. CustomerType带分隔符的数据

  
以下是引用片段:

Tim%114923
  将您的模块部署到服务器

  启动RFHUtil,并设置导出使用的队列管理器和队列

  使用Read File 读取 CustomerType.txt,您应该能够看到数据选项卡下的消息正文

  按WriteQ按钮将消息发送到队列

  选中WebSphere Integration Developer中的Server Logs视图,以便输出DataObject

  回到RFHUtil中,将队列更改为导入使用的队列,并单击ReadQ

  转到Data选项卡,看到的消息正文应该与图5所示类似



图 5. 输出带分隔符的消息

  转回RFHUtil中的Main选项卡

  使用Read File读取CustomerType.xml,您应该能够看到数据选项卡下的消息正文

  转到MQMD选项卡,并将消息格式设置为MQXML



图 6. 设置MQXML消息格式

  回到Main选项卡上,按WriteQ发送消息

  将队列更改为导入使用的队列,并单击ReadQ

  转到Data选项卡,看到的消息正文应该与图 7 所示类似



图 7. 输出XML消息

  结束语

  祝贺您,现在您已经成功地创建并测试了由MQ标头中的字段控制其行为的独立于协议的数据处理程序。我们可以看到,如果将格式字段设置为MQXML,则可以将消息视为XML。如果没有任何MQ标头,还可以将数据视为带分隔符的数据。请尝试将此数据处理程序与另一个绑定一起使用并检查其行为。

  在本文中,我们主要讨论了如何使用MQ格式字段更改数据处理程序逻辑,我们还可以在任何其他标头(如JMS标头或HTTP标头)中使用相应字段(如果处理这些协议)。值得注意的是,如果启用Propagate protocol header,则标头仅对数据处理程序可用,这是缺省设置。要查看设置,请选中绑定属性面板中的Propagation选项卡。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐