您的位置:首页 > 编程语言 > Java开发

Java生产消费者模式之项目实践

2015-07-15 16:01 337 查看
故事一般都是这么开始的……

最近笔者应公司小编的要求写一篇有点深度的文章, 于是在大脑里翻来覆去想找点稍微有点深度的东西,结果笔者很失望。

于是……

笔者翻箱倒柜,终于找到一年前,为了解决某个项目中的某个技术性问题而引入的一套解决方案。该解决方案是在多线程环境下应用生产消费模式的一次实践。

我们先侃侃这生产消费者模式……

笔者以前读过一本书叫<<23种设计模式>>,写的不错,不过现在找不到原书了。于是从网上找来一篇老外对生产消费者模式的描述。

“TheProducer Consumer pattern is an ideal way of separating work that needs to bedone from the execution of that work. As you might guess from its name theProducer Consumer pattern contains two major components,
which are usuallylinked by a queue. This means that the separation of the work that needs doing from theexecution of that work is achieved by the Producer placing items of work on thequeue for later processing instead of dealing with them the moment
they areidentified. The Consumer is then free to remove the work item from the queuefor processing at any time in the future. This decoupling means that Producersdon't care how each item of work will be processed, how many consumers will beprocessing it or
how many other producers there are.
It's a fire andforget world as far as they're concerned. Likewise consumers don't need to knowwhere the work item came from, who put it in the queue, and how many otherproducers and consumers there are. All
they need to do is to grab some workfrom the queue and process it.”

笔者总结一下上面的这段描述,“生产消费者模式就是将一个工作分割成两个部分,并通过队列链接,一部分负责将工作放入队列,一部分负责将队列中的工作拿来处理”。举个简单的例子,一个软件公司,销售人员负责将项目从客户那拉过来,程序员则负责将软件开发出来,公司将产品交给客户。图1是一个简化的生产消费者模型。



图1

图一中生产者和消费者是一对一的,这在实际情况下几乎是不存在的。图2才是真正的生产消费者模型。



图2

我们言归正传……

在项目中,笔者遇到了这样一个问题。笔者的团队开发了一个Service暂命名为小A, 小A封装了一个第三方公司的服务并把自己暴露成了一个SOA服务。公司其他的服务会调用小A去访问第三方公司的一个SOA服务,是不是很绕?各位看官们请看图3便可了解项目架构。



图3

图3中小A会将接收到的内部请求,直接以HTTPS的形式直接转发给第三方公司。所以假设小A同时收到4个来自内部的并发请求时,第三方的服务也会收到4个并发请求。

终于……

某一天,第三方公司的技术人员向笔者抱怨说我们的服务并发太大,并且一次request的记录数只有百十来条,因此他们的服务处理这种case性能不会很好,并且会影响他们的服务。对方技术人员建议笔者增加每个Request的记录条数来降低到己方的并发连接数。

于是笔者脑海中第一个涌现的解决方案就是要求公司其他的服务增加每个Request的记录数,以降低对小A的并发请求。不过笔者很悲催的发现公司其他的服务不会因为小A的这个case而单独增加每个Request的记录数。于是笔者再次脑洞大开,想到生产消费者模式也许能够完美解决这个问题,于是笔者想到了图4的改进方案。



图4

新方案中,小A会持有一个消息队列,小A会将接收到的所有请求记录一股脑的塞到请求消息队列中。小A会根据预设参数启动对应数量的消费者 并行的从队列中取消息发送给第三方的服务,并将返回的结果塞入结果消息队列,以便生产者将结果返回给对应的内部服务请求。

该方案可以很好的控制小A到第三方公司的最大并发请求数以及请求记录的条数。

实现……

消息接口Message.java

/**
* @ClassName: Message
* @Description: Message interface
* @author LL
* @date Jan 13, 2014 4:55:05 PM
*
*/
public interface Message<T> {

/**
*
* @Title: getSize
* @Description: Return message size
* @param @return
* @return int
* @throws
*/
public int getSize();

/**
*
* @Title: getId
* @Description: return message id
* @param @return
* @return String
* @throws
*/
public String getId();

/**
*
* @Title: getMessage
* @Description: TODO
* @param @return
* @return T
* @throws
*/
public T getMessage();
}


消息的实现类AMessage.java

/**
* @ClassName: AMessage
* @Description: The A message
* @author LL
* @date Jan 13, 2014 4:59:45 PM
*
*/
public class AMessage implements Message<List<InputRecord>> {
private String messageId;

private List<InputRecord> inputList;

public AMessage(String messageId, List<InputRecord> inputList) {
this.messageId = messageId;
this.inputList = inputList;
}

/* (non Javadoc)
* <p>Title: getSize</p>
* <p>Description: </p>
* @return
*/
@Override
public int getSize() {
return inputList.size();
}

/* (non Javadoc)
* <p>Title: getId</p>
* <p>Description: </p>
* @return
*/
@Override
public String getId() {
return this.messageId;
}

/* (non Javadoc)
* <p>Title: getMessage</p>
* <p>Description: </p>
* @return
*/
@Override
public List<InputRecord> getMessage() {
return this.inputList;
}

}
NOTE: InputRecord是小A接收的请求记录。

消息队列MsgQueue.java

/**
* @ClassName: MessageQueue
* @Description: message queue
* @author LL
* @date Jan 13, 2014 4:51:18 PM
*
*/
@SuppressWarnings("rawtypes")
public class MsgQueue<E extends Message> extends LinkedBlockingQueue<E> {

/**
* @Fields serialVersionUID : serial id
*/
private static final long serialVersionUID = -4050903454609162888L;

}


生产者MsgProducer.java

/**
* @ClassName: MsgProducer
* @Description: Producer of message
* @author LL
* @date Jan 13, 2014 5:07:10 PM
*
*/
public class MsgProducer {
private MsgQueue<AMessage> messageQueue;

public MsgProducer(MsgQueue<AMessage> messageQueue) {
this.messageQueue = messageQueue;
}

/**
*
* @Title: produce
* @Description: producer会将接收的记录转换内部的消息体供consumer使用
* @param @param inputList
* @param @return
* @return message id (UUID)
* @throws
*/
public String produce(List<InputRecord> inputList) {
String msgID = UUID.randomUUID().toString();
AMessage aMessage = new AMessage(msgID, inputList);
messageQueue.offer(aMessage);

return msgID;
}
}


消费者MsgConsumer.java

/**
* @ClassName: MsgConsumer
* @Description: MsgConsumer
* @author LL
* @date Jan 13, 2014 5:25:22 PM
*
*/
public class MsgConsumer implements Runnable{
private static Logger log = Logger.getLogger(MsgConsumer.class);

private MsgQueue<AMessage> messageQueue;
private BlockingMap<String, List<OutputRecord>> result;

public MsgConsumer(MsgQueue<AMessage> messageQueue, BlockingMap<String, List<OutputRecord>> result) {
this.messageQueue = messageQueue;
this.result = result;
}

/* (non Javadoc)
* <p>Title: run</p>
* <p>Description: </p>
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
while (true) {
try {
List<InputRecord> inputList = new ArrayList<InputRecord>();
Map<String, Integer> indexMap = new HashMap<String, Integer>();
List<String> messageIds = new ArrayList<String>();
int index = 0;
while (true) {
AMessage aMsg = messageQueue.poll(100, TimeUnit.MILLISECONDS);
if (aMsg == null) {
break;
} else if ((inputList.size() + aMsg.getSize()) <= 1000){
inputList.addAll(aMsg.getMessage());
index += aMsg.getMessage().size();
indexMap.put(aMsg.getId(), index);
messageIds.add(aMsg.getId());
} else {
break;
}
}

if (!inputList.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("Invoke flagships server, input size:"+ inputList.size());
log.debug("Index map->"+ JSONUtils.toJson(indexMap));
}
List<OutputRecord> rs= process(inputList);
int i = 0;
for (String id : messageIds) {
List<OutputRecord> outputRecords = new ArrayList<OutputRecord>();
for (int j = i; j < indexMap.get(id); j++) {
outputRecords.add(rs.get(j));
}
result.put(id, outputRecords);
i = indexMap.get(id);
}
}

} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (AException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

try {
Thread.sleep(200);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}


阻塞的Map接口BlockingMap.java (小A所有的request都会通过blocked map取出当前request的result, 并将结果返回给服务调用者)

/**
* @ClassName: BlockingMap
* @Description: Inteface of blocking Map, the method will be blocked
* until there is a value was put.
* @author LL
* @date Jan 13, 2014 2:55:36 PM
*
*/
public interface BlockingMap<K, V> {

public void put(K key, V v) throws InterruptedException;

public V take(K key) throws InterruptedException;

public V poll(K key, long timeout) throws InterruptedException;

}


阻塞的Map的具体实现HashBlockingMap.java (该类参考:http://songsong.iteye.com/blog/802881)

/**
* @ClassName: HashBlockingMap
* @Description: Implement class of Blocking Map
* @author LL
* @date Jan 13, 2014 2:56:34 PM
*
*/
public class HashBlockingMap<K, V> implements BlockingMap<K, V> {
private ConcurrentMap<K, Item<V>> map;

private final ReentrantLock lock = new ReentrantLock();

public HashBlockingMap() {
map = new ConcurrentHashMap<K, Item<V>>();
}

public void put(K key, V v) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
if (map.containsKey(key)) {
Item<V> item = map.get(key);
item.put(v);
} else {
Item<V> item = new Item<V>();
map.put(key, item);
item.put(v);
}
} finally {
lock.unlock();
}
}

public V take(K key) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
if (!map.containsKey(key)) {
map.put(key, new Item<V>());
}
} finally {
lock.unlock();
}

Item<V> item = map.get(key);
V x = item.take();
map.remove(key);

return x;
}

public V poll(K key, long timeout) throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
if (!map.containsKey(key)) {
map.put(key, new Item<V>());
}
} finally {
lock.unlock();
}

Item<V> item = map.get(key);
V x = item.poll(timeout);
map.remove(key);

return x;
}

private static class Item<E> {

private final ReentrantLock lock = new ReentrantLock();

private final Condition cond = lock.newCondition();

private E obj = null;

private void put(E o) throws InterruptedException {
if (o == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
obj = o;
cond.signal();
} finally {
lock.unlock();
}
}

E take() throws InterruptedException {
E x;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (obj == null) {
cond.await();
}
} catch (InterruptedException ie) {
cond.signal();
throw ie;
}
x = obj;
} finally {
lock.unlock();
}
return x;
}

private E poll(long timeout) throws InterruptedException {
timeout = TimeUnit.MILLISECONDS.toNanos(timeout);
E x;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
if (obj != null) {
x = obj;
break;
}
if (timeout <= 0) {
return null;
}
try {
timeout = cond.awaitNanos(timeout);
} catch (InterruptedException ie) {
cond.signal();
throw ie;
}
}
} finally {
lock.unlock();
}
return x;
}

}
}


展望......

为了达到更好的实时性,MsgConsumer中需要加入对等待超时的条件,当超时条件成立时,即使queue中的消息条数未满足预设条件,Consumer也需要将请求发送给第三方公司服务处理。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: