Spring Integration
2015-05-29 00:00
411 查看
摘要: Spring Integration
The Cafe Sample(小卖部订餐例子)
小卖部有一个订饮料服务,客户可以通过订单来订购所需要饮料。小卖部提供两种咖啡饮料
LATTE(拿铁咖啡)和MOCHA(摩卡咖啡)。每种又都分冷饮和热饮
整个流程如下:
1.有一个下订单模块,用户可以按要求下一个或多个订单。
2.有一个订单处理模块,处理订单中那些是关于订购饮料的。
3.有一个饮料订购处理模块,处理拆分订购的具体是那些种类的饮料,把具体需要生产的饮料要求发给生产模块
4.有一个生产模块,进行生产。
5.等生成完成后,有一个订单确认模块(Waiter),把订单的生成的饮料输出。
这个例子利用Spring Integration实现了灵活的,可配置化的模式集成了上述这些服务模块。
Spring Integration提供两种模式的工作方式(Annotation和XML)
先来看一下XML方式,进行示例的开发:
配置文件如下:
<?
xml version="1.0" encoding="UTF-8"
?>
<
beans:beans
xmlns
="http://www.springframework.org/schema/integration"
xmlns:xsi
="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans
="http://www.springframework.org/schema/beans"
xmlns:stream
="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation
="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream-1.0.xsd" >
<!--
首先来配置一个GateWay组件,提供消息的发送和接收。接口Cafe,提供一个void placeOrder(Order order);方法
该方法标记了@Gateway(requestChannel="orders"), 实现向orders队列实现数据的发送
-->
<
gateway
id
="cafe"
service-interface
="org.springframework.integration.samples.cafe.Cafe"
/>
<!--
订单Channel
-->
<
channel
id
="orders"
/>
<!--
实现Splitter模式, 接收 orders队列的消息,调用orderSplitter Bean的split方法,进行消息的分解
并把分解后的消息,发送到drinks队列.
-->
<
splitter
input-channel
="orders"
ref
="orderSplitter"
method
="split"
output-channel
="drinks"
/>
<!--
饮料订单Channel,处理饮料的类别
-->
<
channel
id
="drinks"
/>
<!--
实现Router模式,接收 drinks队列的消息, 并触发 drinkRouter Bean的 resolveOrderItemChannel方法
由在 resolveOrderItemChannel该方法的返回值(String--队列名称)表示把消息路由到那个队列上
-->
<
router
input-channel
="drinks"
ref
="drinkRouter"
method
="resolveOrderItemChannel"
/>
<!--
冷饮生产Channel 最大待处理的数据量为 10
-->
<
channel
id
="coldDrinks"
>
<
queue
capacity
="10"
/>
</
channel
>
<!--
定义一个服务处理器,其作用是定义一个消息接收队列 codeDrinks,一但收到消息,则
触发 barista Bean的 prepareColdDrink方法, 再把 prepareColdDrink方法的值,封成Message的
payLoad属性,把消息再发送到preparedDrinks队列,
-->
<
service-activator
input-channel
="coldDrinks"
ref
="barista"
method
="prepareColdDrink"
output-channel
="preparedDrinks"
/>
<!--
热饮生产Channel 最大待处理的数据量为 10
-->
<
channel
id
="hotDrinks"
>
<
queue
capacity
="10"
/>
</
channel
>
<!--
定义一个服务处理器,其作用是定义一个消息接收队列 hotDrinks,一但收到消息,则
触发 barista Bean的 prepareHotDrink 再把 prepareColdDrink方法的值,封成Message的
payLoad属性,把消息再发送到preparedDrinks队列,
-->
<
service-activator
input-channel
="hotDrinks"
ref
="barista"
method
="prepareHotDrink"
output-channel
="preparedDrinks"
/>
<!--
定义最终进行生产的消息队列
-->
<
channel
id
="preparedDrinks"
/>
<!--
实现 aggregator 模式, 接收 preparedDrinks 消息, 并触发 waiter Bean的prepareDelivery方法
再把处理好的数据,发送到 deliveries队列
-->
<
aggregator
input-channel
="preparedDrinks"
ref
="waiter"
method
="prepareDelivery"
output-channel
="deliveries"
/>
<!--
定义一个 stream 适配器,接收 deliveries队列的消息后,直接输出到屏幕
-->
<
stream:stdout-channel-adapter
id
="deliveries"
/>
<
beans:bean
id
="orderSplitter"
class
="org.springframework.integration.samples.cafe.xml.OrderSplitter"
/>
<
beans:bean
id
="drinkRouter"
class
="org.springframework.integration.samples.cafe.xml.DrinkRouter"
/>
<
beans:bean
id
="barista"
class
="org.springframework.integration.samples.cafe.xml.Barista"
/>
<
beans:bean
id
="waiter"
class
="org.springframework.integration.samples.cafe.xml.Waiter"
/>
</
beans:beans
>
我们来看一下整体服务是怎么启动的
首先我们来看一下CafeDemo这个类,它触发下定单操作
org.springframework.integration.samples.cafe.xml.CafeDemo
1
public
class
CafeDemo {
2
3
public
static
void
main(String[] args) {
4
////
加载Spring 配置文件 "cafeDemo.xml"
5
AbstractApplicationContext context
=
null
;
6
if
(args.length
>
0
) {
7
context
=
new
FileSystemXmlApplicationContext(args);
8
}
9
else
{
10
context
=
new
ClassPathXmlApplicationContext(
"
cafeDemo.xml
"
, CafeDemo.
class
);
11
}
12
//
取得 Cafe实列
13
Cafe cafe
=
(Cafe) context.getBean(
"
cafe
"
);
14
//
准备 发送100条消息(订单)
15
for
(
int
i
=
1
; i
<=
100
; i
++
) {
16
Order order
=
new
Order(i);
17
//
一杯热饮 参数说明1.饮料类型 2.数量 3.是否是冷饮(true表示冷饮)
18
order.addItem(DrinkType.LATTE,
2
,
false
);
19
//
一杯冷饮 参数说明1.饮料类型 2.数量 3.是否是冷饮(true表示冷饮)
20
order.addItem(DrinkType.MOCHA,
3
,
true
);
21
//
下发订单,把消息发给 orders 队列
22
cafe.placeOrder(order);
23
}
24
}
25
26
}
下面是Cafe接口的源代码
public
interface
Cafe {
//
定义GateWay, 把消息发送到 orders 队列, Message的payLoad属性,保存 order参数值
@Gateway(requestChannel
=
"
orders
"
)
void
placeOrder(Order order);
}
OrderSplitter 源代码
1
public
class
OrderSplitter {
2
3
//
接收 从 orders队列接收的 order 消息后,调用 order.getItems方法
4
//
进行订单的分解, 返回的List<OrderItem>可会,被拆分为多个消息后(Message.payLoad),发到指定队列
5
public
List
<
OrderItem
>
split(Order order) {
6
return
order.getItems();
7
}
8
9
}
10
OrderSplitter.split把消息拆分后,变成多个消息,发送到
drinks队列.由drinkRouter进行消息的接收。
1
public
class
DrinkRouter {
2
3
//
从 drinks队列的消息后,根据orderItem的属性,选择路由到不同的队列 coldDrinks或hotDrinks
4
public
String resolveOrderItemChannel(OrderItem orderItem) {
5
return
(orderItem.isIced())
?
"
coldDrinks
"
:
"
hotDrinks
"
;
6
}
7
8
}
下面看一下,如果是一杯冷饮,则消息发送到 coldDrinks队列
接收根据配置,由barista Bean的prepareColdDrink方法接收消息后,进行处理
如果是一杯热饮,则消息发送到 hotDrinks队列
接收根据配置,由barista Bean的prepareHotDrink方法接收消息后,进行处理
1
public
class
Barista {
2
3
private
long
hotDrinkDelay
=
5000
;
4
5
private
long
coldDrinkDelay
=
1000
;
6
7
private
AtomicInteger hotDrinkCounter
=
new
AtomicInteger();
8
9
private
AtomicInteger coldDrinkCounter
=
new
AtomicInteger();
10
11
12
public
void
setHotDrinkDelay(
long
hotDrinkDelay) {
13
this
.hotDrinkDelay
=
hotDrinkDelay;
14
}
15
16
public
void
setColdDrinkDelay(
long
coldDrinkDelay) {
17
this
.coldDrinkDelay
=
coldDrinkDelay;
18
}
19
20
//
处理热饮订单,并生成Drink冷料
21
public
Drink prepareHotDrink(OrderItem orderItem) {
22
try
{
23
Thread.sleep(
this
.hotDrinkDelay);
24
System.out.println(Thread.currentThread().getName()
25
+
"
prepared hot drink #
"
+
hotDrinkCounter.incrementAndGet()
+
"
for order #
"
26
+
orderItem.getOrder().getNumber()
+
"
:
"
+
orderItem);
27
return
new
Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(), orderItem.isIced(),
28
orderItem.getShots());
29
}
catch
(InterruptedException e) {
30
Thread.currentThread().interrupt();
31
return
null
;
32
}
33
}
34
35
//
处理冷饮订单,并生成Drink冷料
36
public
Drink prepareColdDrink(OrderItem orderItem) {
37
try
{
38
Thread.sleep(
this
.coldDrinkDelay);
39
System.out.println(Thread.currentThread().getName()
40
+
"
prepared cold drink #
"
+
coldDrinkCounter.incrementAndGet()
+
"
for order #
"
41
+
orderItem.getOrder().getNumber()
+
"
:
"
+
orderItem);
42
return
new
Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(), orderItem.isIced(),
43
orderItem.getShots());
44
}
catch
(InterruptedException e) {
45
Thread.currentThread().interrupt();
46
return
null
;
47
}
48
}
49
50
}
接下来,已经把订单需要生产的饮料已经完成,现在可以交给服务员(waier)交给客人了。
这里使用的aggregate模式,让服务器等待这个订单的所有饮料生产完后的,交给客户.
下面来介绍该应用
<!--
一旦定义了 aggregator,其会自动监测队列的消息,把消息合并后再发生指定的队列
一般aggregator的参照 splitter一起使用。Spring Integration会根据接收到的消息中的消息头CORRELATION_ID 来判断,如果有相同的CORRELATION_ID发现,则认为它们需要合成一组,并返回(如果没有自定义合组接口)。
当然Spring Integration也提供一个用户自定的接口来判定消息合组是否满足要求
public
interface
CompletionStrategy {
boolean
isComplete(List
<
Message
<?>>
messages);
}
isComplete的方法,收到的messages消息,都是拥用相同消息头CORRELATION_ID的消息。
-->
<
aggregator
input-channel
="preparedDrinks"
ref
="waiter"
method
="prepareDelivery"
output-channel
="deliveries"
/>
最后,完成订单的消息会发到 waiter队列
1
public
class
Waiter {
2
3
public
Delivery prepareDelivery(List
<
Drink
>
drinks) {
4
return
new
Delivery(drinks);
5
}
6
7
8
}
9
10
public
class
Delivery {
11
12
private
static
final
String SEPARATOR
=
"
-----------------------
"
;
13
14
15
private
List
<
Drink
>
deliveredDrinks;
16
17
private
int
orderNumber;
18
19
20
public
Delivery(List
<
Drink
>
deliveredDrinks) {
21
assert
(deliveredDrinks.size()
>
0
);
22
this
.deliveredDrinks
=
deliveredDrinks;
23
this
.orderNumber
=
deliveredDrinks.get(
0
).getOrderNumber();
24
}
25
26
27
public
int
getOrderNumber() {
28
return
orderNumber;
29
}
30
31
public
List
<
Drink
>
getDeliveredDrinks() {
32
return
deliveredDrinks;
33
}
34
35
@Override
36
public
String toString() {
37
StringBuffer buffer
=
new
StringBuffer(SEPARATOR
+
"
\n
"
);
38
buffer.append(
"
Order #
"
+
getOrderNumber()
+
"
\n
"
);
39
for
(Drink drink : getDeliveredDrinks()) {
40
buffer.append(drink);
41
buffer.append(
"
\n
"
);
42
}
43
buffer.append(SEPARATOR
+
"
\n
"
);
44
return
buffer.toString();
45
}
46
47
}
最后我们使用一个 stream channel adaptor把订单生产完成的饮料输出。
<!--
定义一个 stream 适配器,接收 deliveries队列的消息后,直接输出到屏幕
-->
<
stream:stdout-channel-adapter
id
="deliveries"
/>
这样整个流程就执行完了,最终我们的饮料产品就按照订单生产出来了。累了吧,喝咖啡提神着呢!!!
spring-integration官网:
http://www.springsource.org/spring-integration
关于 Annotation的介绍,将在
下篇
介绍。
附:xml配置介绍
Service Activator 配置
1
<!--
配置 Service Activator,接收exampleChannel队列消息。注:exampleHandler至少有一个方法@ServiceActivator
-->
2
<
service-activator
input-channel
="exampleChannel"
ref
="exampleHandler"
/>
3
<!--
会检查 someMethod方法,是否有 @ServiceActivato 标注 output-channel
-->
4
<
service-activator
input-channel
="exampleChannel"
ref
="somePojo"
method
="someMethod"
/>
5
<
service-activator
input-channel
="exampleChannel"
output-channel
="replyChannel"
6
ref
="somePojo"
method
="someMethod"
/>
<inbound-channel-adapter>
触发指定的方法,接收消息队列配置(触发轮循访问的方式)
1
<
inbound-channel-adapter
ref
="source1"
method
="method1"
channel
="channel1"
>
2
<
poller
>
3
<
interval-trigger
interval
="5000"
/>
4
</
poller
>
5
</
inbound-channel-adapter
>
6
7
<
inbound-channel-adapter
ref
="source2"
method
="method2"
channel
="channel2"
>
8
<
poller
>
9
<
cron-trigger
expression
="30 * * * * MON-FRI"
/>
10
</
poller
>
11
</
channel-adapter
>
<outbound-channel-adapter/>
触发指定的方法,发送消息
1
<
outbound-channel-adapter
channel
="channel1"
ref
="target1"
method
="method1"
/>
2
3
<
outbound-channel-adapter
channel
="channel2"
ref
="target2"
method
="method2"
>
4
<
poller
>
5
<
interval-trigger
interval
="3000"
/>
6
</
poller
>
7
</
outbound-channel-adapter
>
Router
消息路由方式
1
<
bean
id
="payloadTypeRouter"
class
="org.springframework.integration.router.PayloadTypeRouter"
>
2
<
property
name
="payloadTypeChannelMap"
>
3
<
map
>
4
<
entry
key
="java.lang.String"
value-ref
="stringChannel"
/>
5
<
entry
key
="java.lang.Integer"
value-ref
="integerChannel"
/>
6
</
map
>
7
</
property
>
8
</
bean
>
Aggregator 消息合并
1
<
channel
id
="inputChannel"
/>
2
3
<
aggregator
id
="completelyDefinedAggregator"
1
4
input-channel
="inputChannel"
2
5
output-channel
="outputChannel"
3
6
discard-channel
="discardChannel"
4
7
ref
="aggregatorBean"
5
8
method
="add"
6
9
completion-strategy
="completionStrategyBean"
7
10
completion-strategy-method
="checkCompleteness"
8
11
timeout
="42"
9
12
send-partial-result-on-timeout
="true"
10
13
reaper-interval
="135"
11
14
tracked-correlation-id-capacity
="99"
12
15
send-timeout
="86420000"
13
/>
16
17
<
channel
id
="outputChannel"
/>
18
19
<
bean
id
="aggregatorBean"
class
="sample.PojoAggregator"
/>
20
21
<
bean
id
="completionStrategyBean"
class
="sample.PojoCompletionStrategy"
/>
1
public
class
PojoCompletionStrategy {
2
3
public
boolean
checkCompleteness(List
<
Long
>
numbers) {
4
int
sum
=
0
;
5
for
(
long
number: numbers) {
6
sum
+=
number;
7
}
8
return
sum
>=
maxValue;
9
}
10
}
Good Luck!
Yours Matthew!
The Cafe Sample(小卖部订餐例子)
小卖部有一个订饮料服务,客户可以通过订单来订购所需要饮料。小卖部提供两种咖啡饮料
LATTE(拿铁咖啡)和MOCHA(摩卡咖啡)。每种又都分冷饮和热饮
整个流程如下:
1.有一个下订单模块,用户可以按要求下一个或多个订单。
2.有一个订单处理模块,处理订单中那些是关于订购饮料的。
3.有一个饮料订购处理模块,处理拆分订购的具体是那些种类的饮料,把具体需要生产的饮料要求发给生产模块
4.有一个生产模块,进行生产。
5.等生成完成后,有一个订单确认模块(Waiter),把订单的生成的饮料输出。
这个例子利用Spring Integration实现了灵活的,可配置化的模式集成了上述这些服务模块。
Spring Integration提供两种模式的工作方式(Annotation和XML)
先来看一下XML方式,进行示例的开发:
配置文件如下:
<?
xml version="1.0" encoding="UTF-8"
?>
<
beans:beans
xmlns
="http://www.springframework.org/schema/integration"
xmlns:xsi
="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans
="http://www.springframework.org/schema/beans"
xmlns:stream
="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation
="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream-1.0.xsd" >
<!--
首先来配置一个GateWay组件,提供消息的发送和接收。接口Cafe,提供一个void placeOrder(Order order);方法
该方法标记了@Gateway(requestChannel="orders"), 实现向orders队列实现数据的发送
-->
<
gateway
id
="cafe"
service-interface
="org.springframework.integration.samples.cafe.Cafe"
/>
<!--
订单Channel
-->
<
channel
id
="orders"
/>
<!--
实现Splitter模式, 接收 orders队列的消息,调用orderSplitter Bean的split方法,进行消息的分解
并把分解后的消息,发送到drinks队列.
-->
<
splitter
input-channel
="orders"
ref
="orderSplitter"
method
="split"
output-channel
="drinks"
/>
<!--
饮料订单Channel,处理饮料的类别
-->
<
channel
id
="drinks"
/>
<!--
实现Router模式,接收 drinks队列的消息, 并触发 drinkRouter Bean的 resolveOrderItemChannel方法
由在 resolveOrderItemChannel该方法的返回值(String--队列名称)表示把消息路由到那个队列上
-->
<
router
input-channel
="drinks"
ref
="drinkRouter"
method
="resolveOrderItemChannel"
/>
<!--
冷饮生产Channel 最大待处理的数据量为 10
-->
<
channel
id
="coldDrinks"
>
<
queue
capacity
="10"
/>
</
channel
>
<!--
定义一个服务处理器,其作用是定义一个消息接收队列 codeDrinks,一但收到消息,则
触发 barista Bean的 prepareColdDrink方法, 再把 prepareColdDrink方法的值,封成Message的
payLoad属性,把消息再发送到preparedDrinks队列,
-->
<
service-activator
input-channel
="coldDrinks"
ref
="barista"
method
="prepareColdDrink"
output-channel
="preparedDrinks"
/>
<!--
热饮生产Channel 最大待处理的数据量为 10
-->
<
channel
id
="hotDrinks"
>
<
queue
capacity
="10"
/>
</
channel
>
<!--
定义一个服务处理器,其作用是定义一个消息接收队列 hotDrinks,一但收到消息,则
触发 barista Bean的 prepareHotDrink 再把 prepareColdDrink方法的值,封成Message的
payLoad属性,把消息再发送到preparedDrinks队列,
-->
<
service-activator
input-channel
="hotDrinks"
ref
="barista"
method
="prepareHotDrink"
output-channel
="preparedDrinks"
/>
<!--
定义最终进行生产的消息队列
-->
<
channel
id
="preparedDrinks"
/>
<!--
实现 aggregator 模式, 接收 preparedDrinks 消息, 并触发 waiter Bean的prepareDelivery方法
再把处理好的数据,发送到 deliveries队列
-->
<
aggregator
input-channel
="preparedDrinks"
ref
="waiter"
method
="prepareDelivery"
output-channel
="deliveries"
/>
<!--
定义一个 stream 适配器,接收 deliveries队列的消息后,直接输出到屏幕
-->
<
stream:stdout-channel-adapter
id
="deliveries"
/>
<
beans:bean
id
="orderSplitter"
class
="org.springframework.integration.samples.cafe.xml.OrderSplitter"
/>
<
beans:bean
id
="drinkRouter"
class
="org.springframework.integration.samples.cafe.xml.DrinkRouter"
/>
<
beans:bean
id
="barista"
class
="org.springframework.integration.samples.cafe.xml.Barista"
/>
<
beans:bean
id
="waiter"
class
="org.springframework.integration.samples.cafe.xml.Waiter"
/>
</
beans:beans
>
我们来看一下整体服务是怎么启动的
首先我们来看一下CafeDemo这个类,它触发下定单操作
org.springframework.integration.samples.cafe.xml.CafeDemo
1
public
class
CafeDemo {
2
3
public
static
void
main(String[] args) {
4
////
加载Spring 配置文件 "cafeDemo.xml"
5
AbstractApplicationContext context
=
null
;
6
if
(args.length
>
0
) {
7
context
=
new
FileSystemXmlApplicationContext(args);
8
}
9
else
{
10
context
=
new
ClassPathXmlApplicationContext(
"
cafeDemo.xml
"
, CafeDemo.
class
);
11
}
12
//
取得 Cafe实列
13
Cafe cafe
=
(Cafe) context.getBean(
"
cafe
"
);
14
//
准备 发送100条消息(订单)
15
for
(
int
i
=
1
; i
<=
100
; i
++
) {
16
Order order
=
new
Order(i);
17
//
一杯热饮 参数说明1.饮料类型 2.数量 3.是否是冷饮(true表示冷饮)
18
order.addItem(DrinkType.LATTE,
2
,
false
);
19
//
一杯冷饮 参数说明1.饮料类型 2.数量 3.是否是冷饮(true表示冷饮)
20
order.addItem(DrinkType.MOCHA,
3
,
true
);
21
//
下发订单,把消息发给 orders 队列
22
cafe.placeOrder(order);
23
}
24
}
25
26
}
下面是Cafe接口的源代码
public
interface
Cafe {
//
定义GateWay, 把消息发送到 orders 队列, Message的payLoad属性,保存 order参数值
@Gateway(requestChannel
=
"
orders
"
)
void
placeOrder(Order order);
}
OrderSplitter 源代码
1
public
class
OrderSplitter {
2
3
//
接收 从 orders队列接收的 order 消息后,调用 order.getItems方法
4
//
进行订单的分解, 返回的List<OrderItem>可会,被拆分为多个消息后(Message.payLoad),发到指定队列
5
public
List
<
OrderItem
>
split(Order order) {
6
return
order.getItems();
7
}
8
9
}
10
OrderSplitter.split把消息拆分后,变成多个消息,发送到
drinks队列.由drinkRouter进行消息的接收。
1
public
class
DrinkRouter {
2
3
//
从 drinks队列的消息后,根据orderItem的属性,选择路由到不同的队列 coldDrinks或hotDrinks
4
public
String resolveOrderItemChannel(OrderItem orderItem) {
5
return
(orderItem.isIced())
?
"
coldDrinks
"
:
"
hotDrinks
"
;
6
}
7
8
}
下面看一下,如果是一杯冷饮,则消息发送到 coldDrinks队列
接收根据配置,由barista Bean的prepareColdDrink方法接收消息后,进行处理
如果是一杯热饮,则消息发送到 hotDrinks队列
接收根据配置,由barista Bean的prepareHotDrink方法接收消息后,进行处理
1
public
class
Barista {
2
3
private
long
hotDrinkDelay
=
5000
;
4
5
private
long
coldDrinkDelay
=
1000
;
6
7
private
AtomicInteger hotDrinkCounter
=
new
AtomicInteger();
8
9
private
AtomicInteger coldDrinkCounter
=
new
AtomicInteger();
10
11
12
public
void
setHotDrinkDelay(
long
hotDrinkDelay) {
13
this
.hotDrinkDelay
=
hotDrinkDelay;
14
}
15
16
public
void
setColdDrinkDelay(
long
coldDrinkDelay) {
17
this
.coldDrinkDelay
=
coldDrinkDelay;
18
}
19
20
//
处理热饮订单,并生成Drink冷料
21
public
Drink prepareHotDrink(OrderItem orderItem) {
22
try
{
23
Thread.sleep(
this
.hotDrinkDelay);
24
System.out.println(Thread.currentThread().getName()
25
+
"
prepared hot drink #
"
+
hotDrinkCounter.incrementAndGet()
+
"
for order #
"
26
+
orderItem.getOrder().getNumber()
+
"
:
"
+
orderItem);
27
return
new
Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(), orderItem.isIced(),
28
orderItem.getShots());
29
}
catch
(InterruptedException e) {
30
Thread.currentThread().interrupt();
31
return
null
;
32
}
33
}
34
35
//
处理冷饮订单,并生成Drink冷料
36
public
Drink prepareColdDrink(OrderItem orderItem) {
37
try
{
38
Thread.sleep(
this
.coldDrinkDelay);
39
System.out.println(Thread.currentThread().getName()
40
+
"
prepared cold drink #
"
+
coldDrinkCounter.incrementAndGet()
+
"
for order #
"
41
+
orderItem.getOrder().getNumber()
+
"
:
"
+
orderItem);
42
return
new
Drink(orderItem.getOrder().getNumber(), orderItem.getDrinkType(), orderItem.isIced(),
43
orderItem.getShots());
44
}
catch
(InterruptedException e) {
45
Thread.currentThread().interrupt();
46
return
null
;
47
}
48
}
49
50
}
接下来,已经把订单需要生产的饮料已经完成,现在可以交给服务员(waier)交给客人了。
这里使用的aggregate模式,让服务器等待这个订单的所有饮料生产完后的,交给客户.
下面来介绍该应用
<!--
一旦定义了 aggregator,其会自动监测队列的消息,把消息合并后再发生指定的队列
一般aggregator的参照 splitter一起使用。Spring Integration会根据接收到的消息中的消息头CORRELATION_ID 来判断,如果有相同的CORRELATION_ID发现,则认为它们需要合成一组,并返回(如果没有自定义合组接口)。
当然Spring Integration也提供一个用户自定的接口来判定消息合组是否满足要求
public
interface
CompletionStrategy {
boolean
isComplete(List
<
Message
<?>>
messages);
}
isComplete的方法,收到的messages消息,都是拥用相同消息头CORRELATION_ID的消息。
-->
<
aggregator
input-channel
="preparedDrinks"
ref
="waiter"
method
="prepareDelivery"
output-channel
="deliveries"
/>
最后,完成订单的消息会发到 waiter队列
1
public
class
Waiter {
2
3
public
Delivery prepareDelivery(List
<
Drink
>
drinks) {
4
return
new
Delivery(drinks);
5
}
6
7
8
}
9
10
public
class
Delivery {
11
12
private
static
final
String SEPARATOR
=
"
-----------------------
"
;
13
14
15
private
List
<
Drink
>
deliveredDrinks;
16
17
private
int
orderNumber;
18
19
20
public
Delivery(List
<
Drink
>
deliveredDrinks) {
21
assert
(deliveredDrinks.size()
>
0
);
22
this
.deliveredDrinks
=
deliveredDrinks;
23
this
.orderNumber
=
deliveredDrinks.get(
0
).getOrderNumber();
24
}
25
26
27
public
int
getOrderNumber() {
28
return
orderNumber;
29
}
30
31
public
List
<
Drink
>
getDeliveredDrinks() {
32
return
deliveredDrinks;
33
}
34
35
@Override
36
public
String toString() {
37
StringBuffer buffer
=
new
StringBuffer(SEPARATOR
+
"
\n
"
);
38
buffer.append(
"
Order #
"
+
getOrderNumber()
+
"
\n
"
);
39
for
(Drink drink : getDeliveredDrinks()) {
40
buffer.append(drink);
41
buffer.append(
"
\n
"
);
42
}
43
buffer.append(SEPARATOR
+
"
\n
"
);
44
return
buffer.toString();
45
}
46
47
}
最后我们使用一个 stream channel adaptor把订单生产完成的饮料输出。
<!--
定义一个 stream 适配器,接收 deliveries队列的消息后,直接输出到屏幕
-->
<
stream:stdout-channel-adapter
id
="deliveries"
/>
这样整个流程就执行完了,最终我们的饮料产品就按照订单生产出来了。累了吧,喝咖啡提神着呢!!!
spring-integration官网:
http://www.springsource.org/spring-integration
关于 Annotation的介绍,将在
下篇
介绍。
附:xml配置介绍
Service Activator 配置
1
<!--
配置 Service Activator,接收exampleChannel队列消息。注:exampleHandler至少有一个方法@ServiceActivator
-->
2
<
service-activator
input-channel
="exampleChannel"
ref
="exampleHandler"
/>
3
<!--
会检查 someMethod方法,是否有 @ServiceActivato 标注 output-channel
-->
4
<
service-activator
input-channel
="exampleChannel"
ref
="somePojo"
method
="someMethod"
/>
5
<
service-activator
input-channel
="exampleChannel"
output-channel
="replyChannel"
6
ref
="somePojo"
method
="someMethod"
/>
<inbound-channel-adapter>
触发指定的方法,接收消息队列配置(触发轮循访问的方式)
1
<
inbound-channel-adapter
ref
="source1"
method
="method1"
channel
="channel1"
>
2
<
poller
>
3
<
interval-trigger
interval
="5000"
/>
4
</
poller
>
5
</
inbound-channel-adapter
>
6
7
<
inbound-channel-adapter
ref
="source2"
method
="method2"
channel
="channel2"
>
8
<
poller
>
9
<
cron-trigger
expression
="30 * * * * MON-FRI"
/>
10
</
poller
>
11
</
channel-adapter
>
<outbound-channel-adapter/>
触发指定的方法,发送消息
1
<
outbound-channel-adapter
channel
="channel1"
ref
="target1"
method
="method1"
/>
2
3
<
outbound-channel-adapter
channel
="channel2"
ref
="target2"
method
="method2"
>
4
<
poller
>
5
<
interval-trigger
interval
="3000"
/>
6
</
poller
>
7
</
outbound-channel-adapter
>
Router
消息路由方式
1
<
bean
id
="payloadTypeRouter"
class
="org.springframework.integration.router.PayloadTypeRouter"
>
2
<
property
name
="payloadTypeChannelMap"
>
3
<
map
>
4
<
entry
key
="java.lang.String"
value-ref
="stringChannel"
/>
5
<
entry
key
="java.lang.Integer"
value-ref
="integerChannel"
/>
6
</
map
>
7
</
property
>
8
</
bean
>
Aggregator 消息合并
1
<
channel
id
="inputChannel"
/>
2
3
<
aggregator
id
="completelyDefinedAggregator"
1
4
input-channel
="inputChannel"
2
5
output-channel
="outputChannel"
3
6
discard-channel
="discardChannel"
4
7
ref
="aggregatorBean"
5
8
method
="add"
6
9
completion-strategy
="completionStrategyBean"
7
10
completion-strategy-method
="checkCompleteness"
8
11
timeout
="42"
9
12
send-partial-result-on-timeout
="true"
10
13
reaper-interval
="135"
11
14
tracked-correlation-id-capacity
="99"
12
15
send-timeout
="86420000"
13
/>
16
17
<
channel
id
="outputChannel"
/>
18
19
<
bean
id
="aggregatorBean"
class
="sample.PojoAggregator"
/>
20
21
<
bean
id
="completionStrategyBean"
class
="sample.PojoCompletionStrategy"
/>
1
public
class
PojoCompletionStrategy {
2
3
public
boolean
checkCompleteness(List
<
Long
>
numbers) {
4
int
sum
=
0
;
5
for
(
long
number: numbers) {
6
sum
+=
number;
7
}
8
return
sum
>=
maxValue;
9
}
10
}
Good Luck!
Yours Matthew!
相关文章推荐
- SpringMVC中通过@ResponseBody返回对象,Js中调用@ResponseBody返回值,统计剩余评论字数的js,@RequestParam默认值,@PathVariable的用法
- SpringMVC中通过@ResponseBody返回对象,Js中调用@ResponseBody返回值,统计剩余评论字数的js,@RequestParam默认值,@PathVariable的用法
- Java的super调用案例: super.getClass()返回的是子类自己
- 《Spring 2之站立会议3》
- JAVA基础笔记——String
- 设计模式(主要java)
- 《Spring2之站立会议2》
- java基础篇——包
- 《Spring2之站立会议1》
- myeclipse修改背景颜色(主题)
- Struts2的主要学习内容
- Java Web目录
- 轻量级javaEE SSH 05: Hibernate
- 轻量级javaEE SSH 04: struts2
- Java基础——其他对象
- Java并发编程:volatile关键字解析
- 轻量级javaEE SSH 03: Spring
- JavaDome
- JAVA敏捷开发环境搭建
- win7 下配置 java 环境变量