您的位置:首页 > 编程语言 > Java开发

Spring boot集成websocket实现分布式websocketsession共享(二)--基于阿里云MQ(ONS)消息配置

2018-11-09 16:17 1961 查看

spring boot集成阿里云ONS,增加配置文件类

[code]@Component
@ConfigurationProperties(prefix="aliyun.ons")
public class MqProperties extends Properties{

/**
* 序列化标志
*/
private static final long serialVersionUID = 1L;
/**
* Producer的标识
*/
public static final String ProducerId = PropertyKeyConst.ProducerId;
/**
* key
*/
public static final String AccessKey = PropertyKeyConst.AccessKey;
/**
* secret
*/
public static final String SecretKey = PropertyKeyConst.SecretKey;
/**
* 消费者
*/
public static final String ConsumerId = PropertyKeyConst.ConsumerId;
/**
* 发送超时时间,单位毫秒
*/
public static final String SendMsgTimeoutMillis = PropertyKeyConst.SendMsgTimeoutMillis;
/**
* 设置 TCP 接入域名(此处以公共云生产环境为例)
*/
public static final String ONSAddr = PropertyKeyConst.ONSAddr;

}

增加配置类

[code]@Configuration
@EnableConfigurationProperties(MqProperties.class)
public class ONSConfig {
/**
* 注入mq的配置文件
*/
@Autowired
private MqProperties mqProperties;

/**
* 配置生成者的bean
*
* @return
*/
@Bean(initMethod = "start", destroyMethod = "shutdown")
public Producer proucer() {
Producer producerBean = ONSFactory.createProducer(mqProperties);
return producerBean;
}
/**
* 配置消费者
* @return
*/
@Bean(initMethod = "start", destroyMethod = "shutdown")
public Consumer consumer() {
// 集群订阅方式 (默认)
mqProperties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
Consumer consumer = ONSFactory.createConsumer(mqProperties);
return consumer;
}
}

增加service接口配置

[code]public interface AliyunMessageService {
/**
* 指定频道和tag发送消息
* @param topic
* @param tag    如果发送全频道,传*
* @param message
* @param uid
*/
public void sendMessage(String topic,String tag,String message,String uid);

/**
* 订阅那个频道的那个标签的消息
* @param topic
* @param tag
*/
public void subscription(String topic,String tag,CallBackService callBackService);

}

增加接受消息的配置类:

[code]public interface CallBackService {
/**
* 处理数据
* @param data
*/
public void dealReceive(String message);
}

增加service实现,实现发送和订阅消息

[code]@Service("aliyunMessageService")
public class AliyunMessageServiceImpl implements AliyunMessageService{
/**
* 日志处理
*/
private static final Logger logger = LoggerFactory.getLogger(AliyunMessageServiceImpl.class);
/**
* 注入生成者
*/
@Autowired
private Producer producer;
/**
* 注入配置文件读取类
*/
@Autowired
public MqProperties mqProperties;
/**
* 注入消费者
*/
@Autowired
private Consumer consumer;

@Override
public void sendMessage(String topic, String tag, String message, String uid) {
// TODO Auto-generated method stub
try {
//解析发送消息
Message msg = new Message(
// Message 所属的 Topic
topic,
// Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤
tag,
// Message Body 可以是任何二进制形式的数据, MQ 不做任何干预,
// 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
message.getBytes("UTF-8"));
// 设置代表消息的业务关键属性,请尽可能全局唯一。
// 以方便您在无法正常收到消息情况下,可通过阿里云服务器管理控制台查询消息并补发
// 注意:不设置也不会影响消息正常收发
msg.setKey(uid);
SendResult sendResult = producer.send(msg);
if (sendResult != null) {
logger.info("消息发送成功:"+message+"result"+ sendResult.toString());
}
} catch (ONSClientException e) {
logger.info("消息发送失败:", e);
// 出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
// TODO 使用redis存储发送内容,然后重新调用发送信息,成功吧删除信息
}catch (Exception e){
e.printStackTrace();
logger.error("发送失败,Topic:{},Tag:{}body为{}",new Object[]{topic,tag,message});
}
}

@Override
public void subscription(String topic,String tag,CallBackService callBackService) {
consumer.subscribe(
//订阅频道
topic,
//订阅的标签,可以是多个 tagA||tagB 或者该频道的全部 *
tag,
new MessageListener() { //添加订阅监听
public Action consume(Message message, ConsumeContext context) {
String data = null;
try {
byte[] body = message.getBody();
data=new String(body,"UTF-8");
callBackService.dealReceive(data);
return Action.CommitMessage;
} catch (Exception e) {
e.printStackTrace();
logger.error("接受消息,处理消息失败");
return Action.ReconsumeLater;
}
}
});
}

}

增加parentHandler类:

[code]public class ParentHandler {
/**
* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
*/
protected static Map<String,Session> socketMap = new HashMap<String, Session>();

/**
* 发送信息给指定用户
* @param clientId
* @param message
* @return
*/
public  boolean sendMessageToUser(String clientId, String message) {
Session session = socketMap.get(clientId);
if(session==null) {
return false;
}
if (!session.isOpen()) {
return false;
}
try {
synchronized (session) {
session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
}
return true;
}
/**
* 广播消息出去
* @param message
* @return
*/
public void sendMessageToAll(String message) {
for (Session session : socketMap.values()) {
if(session==null||!session.isOpen()) {
continue;
}
try {
synchronized (session) {
session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

增加websocket的handler的实现

[code]@ServerEndpoint(value = "/webscoket/call/{ID}")
@Component
public class CTIHandler extends ParentHandler implements CallBackService{
/**
* 配置日志
*/
private final static Logger logger = LoggerFactory.getLogger(CTIHandler.class);
/**
* 连接
* @param session
* @throws Exception
*/
@OnOpen
public void onOpen(Session session,@PathParam("ID") String userName,EndpointConfig config) throws Exception {
logger.info("获取当前"+socketMap.get(userName));
if(socketMap.get(userName)==null) {
socketMap.put(userName,session);
}
AliyunMessageService aliyunMessageService =  (AliyunMessageService) SpringUtil.getBean("aliyunMessageService");
//根据用户角色匹配响应的频道和标签,然后进行消息订阅
aliyunMessageService.subscription("DEV_ZHUBANXIAN_MQ", "T2", this);
logger.info("链接成功");
}

/**
* 接收socket信息
*/
@OnMessage
public void onMessage(String message, Session session) throws Exception {
logger.info("收到的消息"+message);
}

/**
* 连接出错
*/
@OnError
public void onError(Session session, Throwable error) throws Exception {
if (session.isOpen()) {
session.close();
}
logger.info("连接出错"+error);
}
/**
* 连接关闭
*/
@OnClose
public void onClose(@PathParam("ID") String userName) throws Exception {
if(socketMap.get(userName)!=null) {
socketMap.remove(userName);
}
logger.info("连接已关闭:" + userName);
}

@Override
public void dealReceive(String message) {
// TODO Auto-generated method stub
sendMessageToUser("sales01", message);
}
}

 

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐