您的位置:首页 > 编程语言 > Java开发

Spring Boot集成Java DSL的实现代码

2019-01-10 12:03 666 查看

Spring Integration Java DSL已经融合到Spring Integration Core 5.0,这是一个聪明而明显的举动,因为:

  • 基于Java Config启动新Spring项目的每个人都使用它
  • SI Java DSL使您可以使用Lambdas等新的强大Java 8功能
  • 您可以使用 基于IntegrationFlowBuilderBuilder模式构建流

让我们看看基于ActiveMQ JMS的示例如何使用它。

Maven依赖:

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-java-dsl -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
<version>1.2.3.RELEASE</version>
</dependency>
</dependencies>

示例1:Jms入站网关

我们有以下ServiceActivator

@Service
public class ActiveMQEndpoint {
@ServiceActivator(inputChannel = "inboundChannel")
public void processMessage(final String inboundPayload) {
System.out.println("Inbound message: "+inboundPayload);
}
}

如果您想使用SI Java DSL 将inboundPayload从Jms队列发送到Gateway风格的激活器,那么请使用DSLJms工厂:

@Bean
public DynamicDestinationResolver dynamicDestinationResolver() {
return new DynamicDestinationResolver();
}

@Bean
public ActiveMQConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory();
}

@Bean
public DefaultMessageListenerContainer listenerContainer() {
final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());
defaultMessageListenerContainer.setConnectionFactory(connectionFactory());
defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test");
return defaultMessageListenerContainer;
}

@Bean
public MessageChannel inboundChannel() {
return MessageChannels.direct("inboundChannel").get();
}

@Bean
public JmsInboundGateway dataEndpoint() {
return Jms.inboundGateway(listenerContainer())
.requestChannel(inboundChannel()).get();
}

通过dataEndpoint bean 返回JmsInboundGatewaySpec,您还可以向SI通道或Jms目标发送回复。查看文档。

示例2:Jms消息驱动的通道适配器

如果您正在寻找替换消息驱动通道适配器的XML JMS配置,那么JmsMessageDrivenChannelAdapter是一种适合您的方式:

@Bean
public DynamicDestinationResolver dynamicDestinationResolver() {
return new DynamicDestinationResolver();
}

@Bean
public ActiveMQConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory();
}

@Bean
public DefaultMessageListenerContainer listenerContainer() {
final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());
defaultMessageListenerContainer.setConnectionFactory(connectionFactory());
defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test");
return defaultMessageListenerContainer;
}

@Bean
public MessageChannel inboundChannel() {
return MessageChannels.direct("inboundChannel").get();
}

@Bean
public JmsMessageDrivenChannelAdapter dataEndpoint() {
final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =
new ChannelPublishingJmsMessageListener();
channelPublishingJmsMessageListener.setExpectReply(false);
final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new
JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener
);

messageDrivenChannelAdapter.setOutputChannel(inboundChannel());
return messageDrivenChannelAdapter;
}

与前面的示例一样,入站有效负载如样本1中一样发送给激活器。

示例3:使用JAXB的Jms消息驱动的通道适配器

在典型的场景中,您希望通过Jms接受XML作为文本消息,将其转换为JAXB存根并在服务激活器中处理它。我将向您展示如何使用SI Java DSL执行此操作,但首先让我们为xml处理添加两个依赖项:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-xml</artifactId>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
</dependency>

我们将通过JMS接受shiporders ,所以首先XSD命名为shiporder.xsd:

<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">

<xs:element name="shiporder">
<xs:complexType>
<xs:sequence>
<xs:element name="orderperson" type="xs:string"/>
<xs:element name="shipto">
<xs:complexType>
<xs:sequence>
<xs:element name="name" type="xs:string"/>
<xs:element name="address" type="xs:string"/>
<xs:element name="city" type="xs:string"/>
<xs:element name="country" type="xs:string"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="item" maxOccurs="unbounded">
<xs:complexType>
<xs:sequence>
<xs:element name="title" type="xs:string"/>
<xs:element name="note" type="xs:string" minOccurs="0"/>
<xs:element name="quantity" type="xs:positiveInteger"/>
<xs:element name="price" type="xs:decimal"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
<xs:attribute name="orderid" type="xs:string" use="required"/>
</xs:complexType>
</xs:element>

</xs:schema>

新增JAXB maven plugin 生成JAXB存根:

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>jaxb2-maven-plugin</artifactId>
<version>2.3.1</version>
<executions>
<execution>
<id>xjc-schema1</id>
<goals>
<goal>xjc</goal>
</goals>
<configuration>
<!-- Use all XSDs under the west directory for sources here. -->
<sources>
<source>src/main/resources/xsds/shiporder.xsd</source>
</sources>

<!-- Package name of the generated sources. -->
<packageName>com.example.stubs</packageName>
<outputDirectory>src/main/java</outputDirectory>
<clearOutputDir>false</clearOutputDir>
</configuration>
</execution>
</executions>
</plugin>

我们已经准备好了存根类和一切,现在使用Jaxb magic的Java DSL JMS消息驱动适配器:

/**
* Sample 3: Jms message driven adapter with JAXB
*/
@Bean
public JmsMessageDrivenChannelAdapter dataEndpoint() {
final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =
new ChannelPublishingJmsMessageListener();
channelPublishingJmsMessageListener.setExpectReply(false);
channelPublishingJmsMessageListener.setMessageConverter(new MarshallingMessageConverter(shipOrdersMarshaller()));
final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new
JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener
);

messageDrivenChannelAdapter.setOutputChannel(inboundChannel());
return messageDrivenChannelAdapter;
}

@Bean
public Jaxb2Marshaller shipOrdersMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setContextPath("com.example.stubs");
return marshaller;
}

XML配置在Java中使用它可以为您提供如此强大的功能和灵活性。要完成此示例,inboundChannel的服务激活器将如下所示:

/**
* Sample 3
* @param shiporder
*/
@ServiceActivator(inputChannel = "inboundChannel")
public void processMessage(final Shiporder shiporder) {
System.out.println(shiporder.getOrderid());
System.out.println(shiporder.getOrderperson());
}

要测试流,您可以使用以下XML通过JConsole发送到JMS队列:

<?xml version="1.0" encoding="UTF-8"?>
<shiporder orderid="889923"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="shiporder.xsd">
<orderperson>John Smith</orderperson>
<shipto>
<name>Ola Nordmann</name>
<address>Langgt 23</address>
<city>4000 Stavanger</city>
<country>Norway</country>
</shipto>
<item>
<title>Empire Burlesque</title>
<note>Special Edition</note>
<quantity>1</quantity>
<price>10.90</price>
</item>
<item>
<title>Hide your heart</title>
<quantity>1</quantity>
<price>9.90</price>
</item>
</shiporder>

示例4:具有JAXB和有效负载根路由的Jms消息驱动的通道适配器

另一种典型情况是接受XML作为JMS文本消息,将其转换为JAXB存根并根据有效负载根类型将有效负载路由到某个服务激活器。当然SI Java DSL支持所有类型的路由,我将向您展示如何根据有效载荷类型进行路由。

首先,将以下XSD添加到shiporder.xsd所在的文件夹中,并将其命名为purchaseorder.xsd:

<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:tns="http://tempuri.org/PurchaseOrderSchema.xsd"
targetNamespace="http://tempuri.org/PurchaseOrderSchema.xsd"
elementFormDefault="qualified">
<xsd:element name="PurchaseOrder">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="ShipTo" type="tns:USAddress" maxOccurs="2"/>
<xsd:element name="BillTo" type="tns:USAddress"/>
</xsd:sequence>
<xsd:attribute name="OrderDate" type="xsd:date"/>
</xsd:complexType>
</xsd:element>

<xsd:complexType name="USAddress">
<xsd:sequence>
<xsd:element name="name"  type="xsd:string"/>
<xsd:element name="street" type="xsd:string"/>
<xsd:element name="city"  type="xsd:string"/>
<xsd:element name="state" type="xsd:string"/>
<xsd:element name="zip"  type="xsd:integer"/>
</xsd:sequence>
<xsd:attribute name="country" type="xsd:NMTOKEN" fixed="US"/>
</xsd:complexType>
</xsd:schema>

然后添加到jaxb maven插件配置:

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>jaxb2-maven-plugin</artifactId>
<version>2.3.1</version>
<executions>
<execution>
<id>xjc-schema1</id>
<goals>
<goal>xjc</goal>
</goals>
<configuration>
<!-- Use all XSDs under the west directory for sources here. -->
<sources>
<source>src/main/resources/xsds/shiporder.xsd</source>
<source>src/main/resources/xsds/purchaseorder.xsd</source>
</sources>

<!-- Package name of the generated sources. -->
<packageName>com.example.stubs</packageName>
<outputDirectory>src/main/java</outputDirectory>
<clearOutputDir>false</clearOutputDir>
</configuration>
</execution>
</executions>
</plugin>

运行mvn clean install以生成新XSD的JAXB存根。现在承诺有效负载根映射:

@Bean
public Jaxb2Marshaller ordersMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setContextPath("com.example.stubs");
return marshaller;
}

/**
* Sample 4: Jms message driven adapter with Jaxb and Payload routing.
* @return
*/
@Bean
public JmsMessageDrivenChannelAdapter dataEndpoint() {
final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =
new ChannelPublishingJmsMessageListener();
channelPublishingJmsMessageListener.setMessageConverter(new MarshallingMessageConverter(ordersMarshaller()));
final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new
JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener
);

messageDrivenChannelAdapter.setOutputChannel(inboundChannel());
return messageDrivenChannelAdapter;
}

@Bean
public IntegrationFlow payloadRootMapping() {
return IntegrationFlows.from(inboundChannel()).<Object, Class<?>>route(Object::getClass, m->m
.subFlowMapping(Shiporder.class, sf->sf.handle((MessageHandler) message -> {
final Shiporder shiporder = (Shiporder) message.getPayload();
System.out.println(shiporder.getOrderperson());
System.out.println(shiporder.getOrderid());
}))
.subFlowMapping(PurchaseOrder.class, sf->sf.handle((MessageHandler) message -> {
final PurchaseOrder purchaseOrderType = (PurchaseOrder) message.getPayload();
System.out.println(purchaseOrderType.getBillTo().getName());
}))
).get();
}

注意payloadRootMapping bean,让我们解释一下重要的部分:

  • <Object, Class<?>> route - 表示来自inboundChannel的输入将是Object,并且将根据Class <?>执行路由
  • subFlowMapping(Shiporder.class.. - ShipOders的处理。
  • subFlowMapping(PurchaseOrder.class ... - 处理PurchaseOrders。

要测试ShipOrder有效负载,请使用示例3中的XML,以测试PurchaseOrder有效负载,使用以下XML:

<?xml version="1.0" encoding="utf-8"?>
<PurchaseOrder OrderDate="1900-01-01" xmlns="http://tempuri.org/PurchaseOrderSchema.xsd">
<ShipTo country="US">
<name>name1</name>
<street>street1</street>
<city>city1</city>
<state>state1</state>
<zip>1</zip>
</ShipTo>
<ShipTo country="US">
<name>name2</name>
<street>street2</street>
<city>city2</city>
<state>state2</state>
<zip>-79228162514264337593543950335</zip>
</ShipTo>
<BillTo country="US">
<name>name1</name>
<street>street1</street>
<city>city1</city>
<state>state1</state>
<zip>1</zip>
</BillTo>
</PurchaseOrder>

应根据subflow 子流Map路由两个有效载荷。

示例5:IntegrationFlowAdapter

除了企业集成模式的其他实现(check them out)),我需要提到IntegrationFlowAdapter。通过扩展此类并实现buildFlow方法,如:

@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {

@Autowired
private ConnectionFactory rabbitConnectionFactory;

@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(Amqp.inboundAdapter(this.rabbitConnectionFactory, "myQueue"))
.<String, String>transform(String::toLowerCase)
.channel(c -> c.queue("myFlowAdapterOutput"));
}

你可以将bean的重复声明包装成一个组件并给它们所需的流量。然后可以配置这样的组件并将其作为一个类实例提供给调用代码!

因此,让我们举例说明这个repo中的示例3更短一些,并为所有JmsEndpoints定义基类,并在其中定义重复bean:

public class JmsEndpoint extends IntegrationFlowAdapter {

private String queueName;

private String channelName;

private String contextPath;

/**
* @param queueName
* @param channelName
* @param contextPath
*/
public JmsEndpoint(String queueName, String channelName, String contextPath) {
this.queueName = queueName;
this.channelName = channelName;
this.contextPath = contextPath;
}

@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(Jms.messageDrivenChannelAdapter(listenerContainer())
.jmsMessageConverter(new MarshallingMessageConverter(shipOrdersMarshaller()))
).channel(channelName);
}

@Bean
public Jaxb2Marshaller shipOrdersMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setContextPath(contextPath);
return marshaller;
}

@Bean
public DynamicDestinationResolver dynamicDestinationResolver() {
return new DynamicDestinationResolver();
}

@Bean
public ActiveMQConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory();
}

@Bean
public DefaultMessageListenerContainer listenerContainer() {
final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());
defaultMessageListenerContainer.setConnectionFactory(connectionFactory());
defaultMessageListenerContainer.setDestinationName(queueName);
return defaultMessageListenerContainer;
}

@Bean
public MessageChannel inboundChannel() {
return MessageChannels.direct(channelName).get();
}
}

现在声明特定队列的Jms端点很容易:

@Bean
public JmsEndpoint jmsEndpoint() {
return new JmsEndpoint("jms.activeMQ.Test", "inboundChannel", "com.example.stubs");
}

inboundChannel的服务激活器:

/**
* Sample 3, 5
* @param shiporder
*/
@ServiceActivator(inputChannel = "inboundChannel")
public void processMessage(final Shiporder shiporder) {
System.out.println(shiporder.getOrderid());
System.out.println(shiporder.getOrderperson());
}

您不应该错过在项目中使用IntegrationFlowAdapter。我喜欢它的概念。

我最近在Embedit的新的基于Spring Boot的项目中开始使用Spring Integration Java DSL 。即使有一些配置,我发现它非常有用。

  • 它很容易调试。不添加像wiretap这样的配置。
  • 阅读起来要容易得多。是的,即使是lambdas!
  • 它很强大。在Java配置中,您现在有很多选择。

源码地址:https://bitbucket.org/tomask79/spring-integration-java-dsl 

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

您可能感兴趣的文章:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: