您的位置:首页 > 数据库 > Redis

使用MQTT协议+Redis缓存实现APP登录顶号功能

2016-11-18 10:54 681 查看
大家在玩游戏或使用QQ等IM工具时,想必都见到过弹出被顶号或者是您的账号于xx时间在另一设备登录,您已被迫下线这样的提示,然后不得不点退出按钮退出整个应用,或者点击重新登录把另一设备再顶下来。最近我参与的一个项目,正好就有这样的需求,而且,由于我们项目中已经使用到了MQTT协议进行消息推送,实现远程控制,后台用Java实现,缓存使用了Redis,因此,正好可以利用现有的技术来实现这个功能。

实现的思路大概如下:首先,登录时不仅需要账号密码,还可以将设备关键信息记录下来,如设备型号(Android|iPhone)、登录时间、登录IP、设备唯一标识(UUID)等,这就需要前台登录功能与后台接口一起配合实现,并在后台把userId已经相关设备信息保存到Redis中,当在另外一台新设备上登录同一帐号时,将userId对应的相关登录设备信息直接进行覆盖,此时如果旧设备进行重连时,因为该uuid已经不是当前服务端的uuid了,所以直接返回下线通知,为了进行友好提示,也可以将新登录设备的主要信息(设备型号、登录时间)进行返回。

下面简单介绍一下实现的方法。

软件安装

Linux下mqtt服务器Apollo的安装

下载

选择一个目录用来下载保存

下载地址: http://activemq.apache.org/apollo/download.html

官网教程: http://activemq.apache.org/apollo/documentation/getting-started.html

目前版本是 apache-apollo-1.7.1-unix-distro.tar .gz

创建broker

一个broker实例是一个文件夹,其中包含所有的配置文件及运行时的数据,不如日志和消息数

据。Apollo强烈建议不要把实例同安装文件放在一起。在linux操作系统下面,建议将实例建在

/var/lib/目录下面

首先解压:tar -zxvf apache-apollo-1.7.1-unix-distro.tar.gz

选择一个目录存放解压后的文件,我放在了/server/下,解压后的文件夹为 apache-apollo-1.7.1

开始创建broker实例:

cd /var/lib
sudo /server/apache-apollo-1.7.1/bin/apollo create mybroker


下图是Apache官方给的一些建议截图:



启动broker实例

启动broker实例可以有两种方法,如下图中所示:



可以执行

/var/lib/mybroker/bin/apollo-broker run


或者

sudo ln -s "/var/lib/mybroker/bin/apollo-broker-service" /etc/init.d/
/etc/init.d/apollo-broker-service start


使其作为一个service进行启动,以后系统重启后只需运行/etc/init.d/apollo-broker-service start

访问Apollo的监控页面: http://localhost:61680/ 默认用户名、密码为为 admin/password

Linux下Redis的安装与配置

Redis的安装非常简单,已经有现成的Makefile文件,解压后在src目录下使用make命令完成编译即可,redis-benchmark、redis-cli、redis-server、redis-stat 这四个文件,加上一个 redis.conf 就构成了整个redis的最终可用包。它们的作用如下:

redis-server:Redis服务器的daemon启动程序

redis-cli:Redis命令行操作工具。当然,你也可以用telnet根据其纯文本协议来操作

redis-benchmark:Redis性能测试工具,测试Redis在你的系统及你的配置下的读写性能

redis-stat:Redis状态检测工具,可以检测Redis当前状态参数及延迟状况

下载安装:

wget http://download.redis.io/redis-stable.tar.gz tar xzf redis-stable.tar.gz
cd redis-stable
make
make install


启动

编译后生成的可执行文件:

redis-server 是Redis的服务器,启动Redis即运行redis-server

redis-cli 是Redis自带的Redis命令行客户端,学习Redis的重要工具

./redis-server & 不指定配置直接运行,这时采用默认配置,无密码

./redis-server –port 6379 仅指定端口

./redis-server ../redis.conf 指定配置文件

最好还是使用最后一种方式进行启动

如果只是在本机连接,那麽使用默认配置文件不会有什么问题,但是,如果是连接远程服务器端的Redis,则需要对配置文件进行一些修改:

requirepass foobared
#bind 127.0.0.1    ##注释掉
protected-mode no  ##从yes改成no


至于如何将Redis设置后台服务,开机自启等,这里就不介绍了,可以去搜索一下。

功能实现

后台接口

Redis客户端使用的是Jedis,如下代码是一个对Jedis简单的封装

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.exceptions.JedisException;

import java.util.ResourceBundle;

/**
* Jedis Cache 工具类
*/
public class JedisUtils {

private static Logger logger = LoggerFactory.getLogger(JedisUtils.class);

private static JedisPool jedisPool;

/**
*  读取相关的配置
*/
static {
ResourceBundle resourceBundle = ResourceBundle.getBundle("redis");
int maxActive = Integer.parseInt(resour
4000
ceBundle.getString("redis.pool.maxActive"));
int maxIdle = Integer.parseInt(resourceBundle.getString("redis.pool.maxIdle"));
int maxWait = Integer.parseInt(resourceBundle.getString("redis.pool.maxWait"));
int port = Integer.parseInt(resourceBundle.getString("redis.port"));
int timeout = Integer.parseInt(resourceBundle.getString("redis.timeout"));
String ip = resourceBundle.getString("redis.ip");
String auth = resourceBundle.getString("redis.auth");

JedisPoolConfig config = new JedisPoolConfig();
//设置最大连接数
config.setMaxTotal(maxActive);
//设置最大空闲数
config.setMaxIdle(maxIdle);
//设置超时时间
config.setMaxWaitMillis(maxWait);

//初始化连接池
jedisPool = new JedisPool(config, ip, port, timeout, auth);
}

/**
* 获取缓存
* @param key 键
* @return 值
*/
public static String get(String key) {
String value = null;
Jedis jedis = null;
try {
jedis = getResource();
if (jedis.exists(key)) {
value = jedis.get(key);
value = StringUtils.isNotBlank(value) && !"nil".equalsIgnoreCase(value) ? value : null;
logger.debug("get {} = {}", key, value);
}
} catch (Exception e) {
logger.warn("get {} = {}", key, value, e);
} finally {
returnResource(jedis);
}
return value;
}

/**
* 设置缓存
* @param key 键
* @param value 值
* @param cacheSeconds 超时时间,0为不超时
* @return
*/
public static String set(String key, String value, int cacheSeconds) {
String result = null;
Jedis jedis = null;
try {
jedis = getResource();
result = jedis.set(key, value);
if (cacheSeconds != 0) {
jedis.expire(key, cacheSeconds);
}
logger.debug("set {} = {}", key, value);
} catch (Exception e) {
logger.warn("set {} = {}", key, value, e);
} finally {
returnResource(jedis);
}
return result;
}

/**
* 删除缓存
* @param key 键
* @return
*/
public static long del(String key) {
long result = 0;
Jedis jedis = null;
try {
jedis = getResource();
if (jedis.exists(key)){
result = jedis.del(key);
logger.debug("del {}", key);
}else{
logger.debug("del {} not exists", key);
}
} catch (Exception e) {
logger.warn("del {}", key, e);
} finally {
returnResource(jedis);
}
return result;
}

/**
* 缓存是否存在
* @param key 键
* @return
*/
public static boolean exists(String key) {
boolean result = false;
Jedis jedis = null;
try {
jedis = getResource();
result = jedis.exists(key);
logger.debug("exists {}", key);
} catch (Exception e) {
logger.warn("exists {}", key, e);
} finally {
returnResource(jedis);
}
return result;
}

/**
* 获取资源
* @return
* @throws JedisException
*/
public static Jedis getResource() throws JedisException {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
} catch (JedisException e) {
logger.warn("getResource.", e);
returnBrokenResource(jedis);
throw e;
}
return jedis;
}

/**
* 归还资源
* @param jedis
*/
public static void returnBrokenResource(Jedis jedis) {
if (jedis != null) {
jedisPool.returnBrokenResource(jedis);
}
}

/**
* 释放资源
* @param jedis
*/
public static void returnResource(Jedis jedis) {
if (jedis != null) {
jedisPool.returnResource(jedis);
}
}
}


然后在登录接口中,当判断完登录的用户名密码正确后,可以参考如下代码的思路去实现,首先判断Redis中是否已保存有这个userId对用的值,有的话说明当前已经有登录,需要被替换到,同时使用MQTT发送消息给客户端使其退出,Redis中不存在则只需保存userId和uuidStr即可

String uuidStr = "";  //这个值从APP端传过来

// 先判断Redis中是否已经有,有的话需要替换掉
if(JedisUtils.get(userId) != null && !JedisUtils .get(userId).equals(uuidStr)) {
MqttClient client = MyMqttClient.getInstance();
String topic = "TOPIC/LOGIN_LOGOUT";
client.subscribe(topic, 1);
MyMqttClient.sendMessage("Log out", topic);
client.unsubscribe(topic);
}

JedisUtils.set(userId, uuidStr, 0);


至于MQTT协议的实现,这里使用的是Paho,如果后台项目是使用Maven构建的话,在pom.xml中加入如下几行即可:

<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
</dependency>


然后对其进行了一个简单的封装

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MyMqttClient {

private MyMqttClient() {}

private static MqttClient mqttClientInstance = null;

private static MqttConnectOptions options;

//静态工厂方法
public static synchronized MqttClient getInstance() {
try {
if (mqttClientInstance == null) {
mqttClientInstance = new MqttClient("tcp://125.216.242.151:61613",
MqttClient.generateClientId(), new MemoryPersistence());

options = new MqttConnectOptions();
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
//设置连接的用户名
options.setUserName("admin");
//设置连接的密码
options.setPassword("password".toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);

mqttClientInstance.connect(options);
}

return mqttClientInstance;

}catch (Exception e){
e.printStackTrace();
return null;
}
}

public static void sendMessage(String content, String myTopic) {
MqttTopic topic = getInstance().getTopic(myTopic);
MqttMessage message = new MqttMessage();
message.setQos(1);
message.setRetained(false);
message.setPayload(content.getBytes());

try {
MqttDeliveryToken token = topic.publish(message);
} catch (MqttException e) {
e.printStackTrace();
}
}

public static MqttConnectOptions getOptions(){
return options;
}

}


app端

客户端的做法思路也很简单,由于使用了MQTT,因此客户端和服务器端其实已经保持了一个长连接,可以为客户端写一个MQTTService,随时监听服务器推送过来的消息进行处理

//为MTQQ client设置回调
client.setCallback(new MqttCallback() {

@Override
public void connectionLost(Throwable cause) {
//连接丢失后,一般在这里面进行重连
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//publish后会执行到这里
}

@Override
public void messageArrived(String topicName, MqttMessage message) throws Exception {

if(message.toString().equals("Log out")) {

handler.post(new Runnable() {
@Override
public void run() {
AlertDialog.Builder builder = new AlertDialog.Builder(getApplicationContext());
builder.setMessage("被顶号了");
builder.setNegativeButton("退出", new DialogInterface.OnClickListener() {
@Override
public void onClick(DialogInterface dialog, int which) {
// TODO 退出当前账号,在这里简单粗暴的结束了应用
stopSelf();
android.os.Process.killProcess(android.os.Process.myPid());
}
});
Dialog dialog = builder.create();
dialog.setCanceledOnTouchOutside(false);
dialog.getWindow().setType(WindowManager.LayoutParams.TYPE_SYSTEM_ALERT);
dialog.show();
}
});
}
}
});


总结

上述代码可能在严谨性和可靠性上还会存在一些问题,还需要经过不断的完善,但思路是很明确的。在这里尤其要安利一下MTQQ,现在越来越多的产品都是基于这个协议进行开发,进行消息推送等。它开销很小,支持各种流行编程语言,能够适应不稳定的网络传输需求,在未来几年,相信MQTT的应用会越来越广。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  redis mqtt