您的位置:首页 > 其它

JMS生产者+单线程发送-我们到底能走多远系列(29)

2013-07-10 14:41 477 查看

我们到底能走多远系列(29)

扯淡:

  “然后我俩各自一端/望着大河弯弯/终于敢放胆/嘻皮笑脸/面对/人生的难” --- 《山丘》

  “迎着风/迎向远方的天空/路上也有艰难/也有那解脱/都走得从容” --- 《与你到永久》

  “遇上冷风雨休太认真/自信满心里休理会讽刺与质问/笑骂由人洒脱地做人/少年人洒脱地做人/继续行洒脱地做人” ---《沉默是金》

  

主题:

  使用JMS将共通模块分离出去,比如发短信模块,可以在远程的机器上跑customer,然后各个应用使用发短信功能是只要向远程机器发送msg即可。

  类似于下图:



  对于图中的Producer的实现都差不多,主要是选择什么样的Jms第三方实现。对于Customer我们不必关心.

  比如下面的代码是HornetQ的Producer的样例代码:

public class JmsProducer implements ExceptionListener,FailureListener{

private final Logger logger = LoggerFactory.getLogger(JmsProducer.class);
private String queueName;
private String jmsHost;
private int jmsPort;
private ConnectionFactory cf;
private Queue queue;
private Connection queueConnection;
private Session queueSession;
private MessageProducer queueProducer;

public void init() throws Exception {
queue = HornetQJMSClient.createQueue(queueName);
Map<String, Object> connectionParams = new HashMap<String, Object>();
connectionParams.put(TransportConstants.PORT_PROP_NAME, jmsPort);
connectionParams.put(TransportConstants.HOST_PROP_NAME, jmsHost);
TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName(),
connectionParams);
HornetQConnectionFactory hornetQConnectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transportConfiguration);
hornetQConnectionFactory.setClientFailureCheckPeriod(60000);
hornetQConnectionFactory.setRetryInterval(2000);          // 2 seconds for first retry
hornetQConnectionFactory.setRetryIntervalMultiplier(1.5); // 1.5 times loner betrween retrys
hornetQConnectionFactory.setMaxRetryInterval(20000);      // Wait max 20 secs between retrys
hornetQConnectionFactory.setReconnectAttempts(-1);        // Retry forever
hornetQConnectionFactory.setConnectionTTL(60000);         //The default value for connection ttl is 60000ms
hornetQConnectionFactory.setClientFailureCheckPeriod(30000);//The default value for client failure check period is 30000ms
cf = (ConnectionFactory)hornetQConnectionFactory;
queueConnection = cf.createConnection();
queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queueProducer = queueSession.createProducer(queue);
queueProducer.setTimeToLive(6000000);//100分钟失效
queueProducer.setDisableMessageID(true);//关闭消息id
queueProducer.setDisableMessageTimestamp(true);//关闭消息的时间戳
logger.info("init JmsProducer of "+queueName);
//queueConnection.start();
}

public void reConnect(){
logger.info(queueName+" reConnect");
}

public void destroy() throws Exception {
logger.info("destroy JmsProducer of "+queueName);
if(queueSession != null){
queueSession.close();
queueSession = null;
}
if(queueConnection != null){
queueConnection.close();
queueConnection = null;
}
}

public String getQueueName() {
return queueName;
}

public void setQueueName(String queueName) {
this.queueName = queueName;
}

public String getJmsHost() {
return jmsHost;
}

public void setJmsHost(String jmsHost) {
this.jmsHost = jmsHost;
}

public int getJmsPort() {
return jmsPort;
}

public void setJmsPort(int jmsPort) {
this.jmsPort = jmsPort;
}

public Session getQueueSession() {
return queueSession;
}

public void sendTextMessage(final TextMessage textMessage) throws JMSException {
try {
queueProducer.send(textMessage);
} catch (Exception e) {
// TODO: handle exception
logger.error("on sendTextMessage Exception="+e.getMessage());
}
}

public void onException(JMSException jmsex) {
// TODO Auto-generated method stub
logger.warn("on JmsProducer Exception="+jmsex.getMessage());
}

public void connectionFailed(HornetQException hqex, boolean arg1) {
// TODO Auto-generated method stub
logger.error("on JmsProducer connectionFailed,arg1="+arg1+",Exception="+hqex.getMessage());
}
}


  一般性的,我们利用Spring 把这个JmsProducer 注入进自己的业务类里去使用即可:

  spring的bean配置:

<bean id="jmsCodeProducer" class="com.sz.lvban.biz.bo.util.JmsProducer" init-method="init">
<property name="jmsHost" value="${jms.send.code.host}" />
<property name="jmsPort" value="${jms.send.code.port}" />
<property name="queueName" value="${jms.send.code.queueName}" />
</bean>


@Autowired
private JmsProducer                  jmsCodeProducer;


  某方法直接调用:

textMsg = jmsCodeProducer.getQueueSession().createTextMessage();
textMsg.setText(smsJson.toJSONString());
jmsCodeProducer.sendTextMessage(textMsg);


  这样就实现了让服务器上的customer干活的工作了。

然后,我们发现spring默认注入jmsCodeProducer使用了单例的模式,这样一来我们就可能考虑多线程调用冲突的问题。然而我们不能避免jmsCodeProducer的单例,毕竟init-method="init" 的init方法有点消耗的。

所以就搞了下面的方案:(上图的题部分)



使用一个queue做中间站,只要保证单线程从queue中取数据,就能实现一条条向远程服务器发送jms消息。

下面是一个实现的例子:

我们先配置一个ApnsMsgSender,有他来控制queue的行为,包括插入,取出数据,以及发送jms消息。

<bean id="ApnsMsgSender" class="com.sz.wxassistant.biz.bo.util.ApnsMsgSender" init-method="sendMsg">
<property name="jmsApnsProducer" ref="jmsApnsProducer"></property>
</bean>


注意init-method="sendMsg" 启动时,我们就需要启动一个线程来监控queue。

结合下代码:在这里我们使用了LinkedBlockingQueue,关于ArrayBlockingQueue和LinkedBlockingQueue之间的取舍,我没有实际测试过。

public class ApnsMsgSender {

// private ArrayBlockingQueue<TextMessage> queue = new
// ArrayBlockingQueue<TextMessage>(1024);
private LinkedBlockingQueue<TextMessage> jmsQueue = new LinkedBlockingQueue<TextMessage>();
private JmsProducer                      jmsApnsProducer;
private ExecutorService                  pool;
private Logger                           log      = LoggerFactory.getLogger("ApnsMsgSender");

/**
* 启动入口
*/
public void sendMsg() {
pool = Executors.newCachedThreadPool(new MyThreadFactory());
pool.submit(new JmsSender());
}

public boolean addJms(TextMessage msg) {
return jmsQueue.offer(msg);
}

public TextMessage getMsg() {
TextMessage msg = null;
try {
// 取msg 10秒超时设置
msg = jmsQueue.poll(10, TimeUnit.SECONDS);
} catch (InterruptedException interuptedE) {
log.warn("poll jms error" + interuptedE);
} catch (Exception e) {
log.error("poll jms get unknown error: ", e);
}
return msg;
}

public JmsProducer getJmsApnsProducer() {
return jmsApnsProducer;
}

public void setJmsApnsProducer(JmsProducer jmsApnsProducer) {
this.jmsApnsProducer = jmsApnsProducer;
}

public TextMessage genTextMessage() throws JMSException {
return jmsApnsProducer.getQueueSession().createTextMessage();
}

private class JmsSender implements Runnable {
public void run() {
while (true) {
try {
// 从queue中取msg
TextMessage msg = getMsg();
if (msg != null && msg instanceof TextMessage) {
// 发送
jmsApnsProducer.sendTextMessage(msg);
}
} catch (JMSException jmsE) {
log.error("send jms error: " + jmsE);
} catch (Exception e) {
log.error("get unknown error: ", e);
}
}
}
}

class MyThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
// 线程设置为后台进程
thread.setDaemon(true);
thread.setName("ApnsMsgSender");
return thread;
}
}
}


解释:

  我们的这个线程做了什么?

    1,  getMsg()

    2,  sendTextMessage(msg)

  很明了的实现....


注:代码中还使用了ThreadFactory 来封装了一下线程。

  外界代码调用怎么搞?

    1,addJms 就可以了

只负责向queue里放,这样再多的线程都没有关系了。

TextMessage msg = apnsMsgSender.genTextMessage();
msg.setText("I love los angeles !");
apnsMsgSender.addJms(msg);


ok。到这里就实现了单线程发送jms消息的功能。

让我们继续前行

----------------------------------------------------------------------

努力不一定成功,但不努力肯定不会成功。
共勉。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: