Spring boot集成websocket实现分布式websocketsession共享(二)--基于阿里云MQ(ONS)消息配置
2018-11-09 16:17
1961 查看
spring boot集成阿里云ONS,增加配置文件类
[code]@Component @ConfigurationProperties(prefix="aliyun.ons") public class MqProperties extends Properties{ /** * 序列化标志 */ private static final long serialVersionUID = 1L; /** * Producer的标识 */ public static final String ProducerId = PropertyKeyConst.ProducerId; /** * key */ public static final String AccessKey = PropertyKeyConst.AccessKey; /** * secret */ public static final String SecretKey = PropertyKeyConst.SecretKey; /** * 消费者 */ public static final String ConsumerId = PropertyKeyConst.ConsumerId; /** * 发送超时时间,单位毫秒 */ public static final String SendMsgTimeoutMillis = PropertyKeyConst.SendMsgTimeoutMillis; /** * 设置 TCP 接入域名(此处以公共云生产环境为例) */ public static final String ONSAddr = PropertyKeyConst.ONSAddr; }
增加配置类
[code]@Configuration @EnableConfigurationProperties(MqProperties.class) public class ONSConfig { /** * 注入mq的配置文件 */ @Autowired private MqProperties mqProperties; /** * 配置生成者的bean * * @return */ @Bean(initMethod = "start", destroyMethod = "shutdown") public Producer proucer() { Producer producerBean = ONSFactory.createProducer(mqProperties); return producerBean; } /** * 配置消费者 * @return */ @Bean(initMethod = "start", destroyMethod = "shutdown") public Consumer consumer() { // 集群订阅方式 (默认) mqProperties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING); Consumer consumer = ONSFactory.createConsumer(mqProperties); return consumer; } }
增加service接口配置
[code]public interface AliyunMessageService { /** * 指定频道和tag发送消息 * @param topic * @param tag 如果发送全频道,传* * @param message * @param uid */ public void sendMessage(String topic,String tag,String message,String uid); /** * 订阅那个频道的那个标签的消息 * @param topic * @param tag */ public void subscription(String topic,String tag,CallBackService callBackService); }
增加接受消息的配置类:
[code]public interface CallBackService { /** * 处理数据 * @param data */ public void dealReceive(String message); }
增加service实现,实现发送和订阅消息
[code]@Service("aliyunMessageService") public class AliyunMessageServiceImpl implements AliyunMessageService{ /** * 日志处理 */ private static final Logger logger = LoggerFactory.getLogger(AliyunMessageServiceImpl.class); /** * 注入生成者 */ @Autowired private Producer producer; /** * 注入配置文件读取类 */ @Autowired public MqProperties mqProperties; /** * 注入消费者 */ @Autowired private Consumer consumer; @Override public void sendMessage(String topic, String tag, String message, String uid) { // TODO Auto-generated method stub try { //解析发送消息 Message msg = new Message( // Message 所属的 Topic topic, // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤 tag, // Message Body 可以是任何二进制形式的数据, MQ 不做任何干预, // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式 message.getBytes("UTF-8")); // 设置代表消息的业务关键属性,请尽可能全局唯一。 // 以方便您在无法正常收到消息情况下,可通过阿里云服务器管理控制台查询消息并补发 // 注意:不设置也不会影响消息正常收发 msg.setKey(uid); SendResult sendResult = producer.send(msg); if (sendResult != null) { logger.info("消息发送成功:"+message+"result"+ sendResult.toString()); } } catch (ONSClientException e) { logger.info("消息发送失败:", e); // 出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。 // TODO 使用redis存储发送内容,然后重新调用发送信息,成功吧删除信息 }catch (Exception e){ e.printStackTrace(); logger.error("发送失败,Topic:{},Tag:{}body为{}",new Object[]{topic,tag,message}); } } @Override public void subscription(String topic,String tag,CallBackService callBackService) { consumer.subscribe( //订阅频道 topic, //订阅的标签,可以是多个 tagA||tagB 或者该频道的全部 * tag, new MessageListener() { //添加订阅监听 public Action consume(Message message, ConsumeContext context) { String data = null; try { byte[] body = message.getBody(); data=new String(body,"UTF-8"); callBackService.dealReceive(data); return Action.CommitMessage; } catch (Exception e) { e.printStackTrace(); logger.error("接受消息,处理消息失败"); return Action.ReconsumeLater; } } }); } }
增加parentHandler类:
[code]public class ParentHandler { /** * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 */ protected static Map<String,Session> socketMap = new HashMap<String, Session>(); /** * 发送信息给指定用户 * @param clientId * @param message * @return */ public boolean sendMessageToUser(String clientId, String message) { Session session = socketMap.get(clientId); if(session==null) { return false; } if (!session.isOpen()) { return false; } try { synchronized (session) { session.getBasicRemote().sendText(message); } } catch (IOException e) { e.printStackTrace(); } return true; } /** * 广播消息出去 * @param message * @return */ public void sendMessageToAll(String message) { for (Session session : socketMap.values()) { if(session==null||!session.isOpen()) { continue; } try { synchronized (session) { session.getBasicRemote().sendText(message); } } catch (IOException e) { e.printStackTrace(); } } } }
增加websocket的handler的实现
[code]@ServerEndpoint(value = "/webscoket/call/{ID}") @Component public class CTIHandler extends ParentHandler implements CallBackService{ /** * 配置日志 */ private final static Logger logger = LoggerFactory.getLogger(CTIHandler.class); /** * 连接 * @param session * @throws Exception */ @OnOpen public void onOpen(Session session,@PathParam("ID") String userName,EndpointConfig config) throws Exception { logger.info("获取当前"+socketMap.get(userName)); if(socketMap.get(userName)==null) { socketMap.put(userName,session); } AliyunMessageService aliyunMessageService = (AliyunMessageService) SpringUtil.getBean("aliyunMessageService"); //根据用户角色匹配响应的频道和标签,然后进行消息订阅 aliyunMessageService.subscription("DEV_ZHUBANXIAN_MQ", "T2", this); logger.info("链接成功"); } /** * 接收socket信息 */ @OnMessage public void onMessage(String message, Session session) throws Exception { logger.info("收到的消息"+message); } /** * 连接出错 */ @OnError public void onError(Session session, Throwable error) throws Exception { if (session.isOpen()) { session.close(); } logger.info("连接出错"+error); } /** * 连接关闭 */ @OnClose public void onClose(@PathParam("ID") String userName) throws Exception { if(socketMap.get(userName)!=null) { socketMap.remove(userName); } logger.info("连接已关闭:" + userName); } @Override public void dealReceive(String message) { // TODO Auto-generated method stub sendMessageToUser("sales01", message); } }
阅读更多
相关文章推荐
- 基于tomcat 7.0.68 的websocket 实现,及通过 HttpSessionId 实现websocket session 共享
- 项目分布式部署那些事(1):ONS消息队列、基于Redis的Session共享,开源共享
- websocket 分布式开发,websocket session不支持序列化,无法存储至radis
- 基于纯Java代码的Spring容器和Web容器零配置的思考和实现(2) - 静态资源、视图和消息器
- python基于protobu+websocket+tornado实现多种二进制协议消息的接收发送和解析demo
- java分布式开发TCP/IP NIO无阻塞 Socket((基于消息方式实现系统间的通信) )(转)
- 基于JMS消息中间件的分布式系统初探究(一) - 通过JMS实现Web服务器与服务框架的通讯
- 详解之:linux下tomcat、nginx的负载均衡及memcached对session共享的实现配置详细总结
- 详解之:linux下tomcat、nginx的负载均衡及memcached对session共享的实现配置详细总结
- 基于Nodejs和Mongodb的web框架并集成Apache Solr和RabbitMQ—SOWF
- 【简单Web服务器搭建】基于Socket实现的最简单的Web服务器【ASP.NET原理分析】
- 基于RHCS+ISCSI+CLVM实现web服务的共享存储集群架构
- PHP实现多web服务器共享SESSION数据
- Nginx+Tomcat+Memecached实现session共享配置
- Apache+tomcat实现负载均衡集群和session共享、tengine+tomcat实现web动静分离
- PHP实现多web服务器共享SESSION数据-session数据写入mysql数据库
- 分布式 php实现session共享
- PHP实现多web服务器共享SESSION数据-session数据写入mysql数据库
- 【转】【简单Web服务器搭建】基于Socket实现的最简单的Web服务器【ASP.NET原理分析】
- 关于基于web浏览器实现分布式计算构想