您的位置:首页 > 其它

[整理]阿里云之消息队列的使用

2016-06-20 20:27 459 查看
环境准备

1.申请阿里云账号,开通消息队列服务,申请accessKey。

2.SDK导入,Topic、发布和订阅申请

发布端代码

工具类:

import com.aliyun.openservices.ons.api.*;

import java.util.Properties;

/**
* Created by Youme on 2016/6/20.
*/
public class CloudMQUtil {

private static final String ACCESS_KEY = "XXXXXXXXX"; //你自己的AccessKey
private static final String SECRET_KEY = "XXXXXXXXX"; //你自己的SecretKey

/**
* 获取消息的 Producer
*
* @param producerId producerId
* @return Producer
*/
public static Producer getProducer(String producerId) {
Properties properties = new Properties();
properties.put(PropertyKeyConst.ProducerId, producerId);
properties.put(PropertyKeyConst.AccessKey, ACCESS_KEY);
properties.put(PropertyKeyConst.SecretKey, SECRET_KEY);
Producer producer = ONSFactory.createProducer(properties);

// 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
producer.start();
return producer;
}
}
调用:

Producer producer = CloudMQUtil.getProducer("XXXXXXX"); //你申请的producerId
Message msg = new Message("XXXXXXX", //你申请的TopicName
"tag_003", "key_003", "这是消息".getBytes());
producer.send(msg);


订阅端代码

MessageListenser实现类:

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;

import java.util.Date;

/**
* Created by Youme on 2016/6/20.
*/
public class MyMessageListener implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
System.out.println("Receive @" + new Date() + ": " + message);

String body = new String(message.getBody());
System.out.println("msgBody is : " + body);
return Action.CommitMessage;
}
}
订阅器:

import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Date;
import java.util.Properties;

/**
* Created by Youme on 2016/6/20.
*/
public class MyMessageConsumer {
private static final String ACCESS_KEY = "XXXXXXXX"; //你自己的AccessKey
private static final String SECRET_KEY = "XXXXXXXX"; //你自己的AccessKey
private static final String TOPIC_NAME = "XXXXXXXX"; //你申请的TopicName
private static final String CONSUMER_ID = "XXXXXXXX"; //你申请的ConsumerId

/**
* 订阅消息
*/
public void subscribe() {
Properties properties = new Properties();
properties.put(PropertyKeyConst.ConsumerId, CONSUMER_ID);
properties.put(PropertyKeyConst.AccessKey, ACCESS_KEY);
properties.put(PropertyKeyConst.SecretKey, SECRET_KEY);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe(TOPIC_NAME, "*", new PointMessageListener());
consumer.start();
System.out.println(CONSUMER_ID + " is running @" + new Date());
}
}
调用:

new MyMessageConsumer().subscribe(); // 系统启动的时候启动订阅
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  消息队列 MQ 阿里云