您的位置:首页 > 移动开发 > Android开发

Android中即时通讯协议选择

2016-05-08 18:57 597 查看

Android中的即时通讯,就是客户端要与服务器建立长时间的连接,正常情况下Android中请求服务器数据后,连接一般断开。但即时通讯类的对消息的实时性要求比较高。需要客户端与服务器建立长时间的连接,这样,才能保证消息的时实性。



1、XMPP协议

1、XMPP基于xml,XMPP用TCP传递的是xml流。它具有xml的灵活性和扩展性。可以实现服务类实时通讯。

XMPP的核心XML流传输协议的定义使得XMPP能够在一个比以往网络通信协议更规范的平台上。借助于XML易于解析和阅读的特性。

XMPP核心协议通信的基本模式就是先建立一个stream,然后协商一堆安全之类的东西,中间通信过程就是客户端发送XML Stanza,一个接一个的。服务器根据客户端发送的信息以及程序的逻辑,发送XML Stanza给客户端。但是这个过程并不是一问一答的,任何时候都有可能从一方发信给另外一方。通信的最后阶段是关闭流,关闭TCP/IP连接。

1、配置

public class XmppContacts {

public static final String XMPP_HOST = "x.x.x.x"; // XMPP服务器ip地址
public static final int XMPP_PORT = 8088; // XMPP服务器客户端访问端口号
public static final int XMPP_PORT_MANAGER = 8099; // XMPP服务器后台访问端口号
public static final String XMPP_NAME = "localhost.localhost"; // XMPP服务器名称
private static final String XMPP_CLIENT_RESOURCE = "Smack"; // XMPP客户端名称


2、创建连接

private static boolean openConnection() {
// 建立连接配置信息对象
ConnectionConfiguration connConfig = new ConnectionConfiguration(
XmppContacts.XMPP_HOST, XmppContacts.XMPP_PORT,
XmppContacts.XMPP_NAME);
// 允许重复连接
connConfig.setReconnectionAllowed(true);
// 其他配置
connConfig.setSendPresence(false);
connConfig
.setSecurityMode(ConnectionConfiguration.SecurityMode.enabled);
connConfig.setDebuggerEnabled(XmppContacts.XMPP_DEBUG_MODEL); // 调试模式
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.ICE_CREAM_SANDWICH) {
connConfig.setTruststoreType("AndroidCAStore");
connConfig.setTruststorePassword(null);
connConfig.setTruststorePath(null);
} else {
connConfig.setTruststoreType("BKS");
String path = System.getProperty("javax.net.ssl.trustStore");
if (path == null)
path = System.getProperty("java.home") + File.separator + "etc"
+ File.separator + "security" + File.separator
+ "cacerts.bks";
connConfig.setTruststorePath(path);
}

// 创建连接对象
conn = new XMPPConnection(connConfig);
try {
conn.connect();
// 配置各种Provider
configureConnection(ProviderManager.getInstance());
return true;
} catch (XMPPException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return false;
}


关闭连接

/**
* 关闭XMPP连接
*/
public static void closeConnection() {
if (conn.isConnected()) {
System.out.println("---------xmpp将要关闭");
conn.disconnect();
}
conn = null;
}


2、MQTT协议

MQTT(Message Queuing Telemetry Transport)是IBM开发的一个即时通讯的协议。

MQTT的特点:

1、使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;

2、对负载内容屏蔽的消息传输;

3、使用 TCP/IP 提供网络连接;

4、有三种消息发布服务质量:

“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。

“至少一次”,确保消息到达,但消息重复可能会发生。

“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

5、小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;

MQTT非常适合作为Android手机客户端与服务器推送消息。其中SohuCmstop手机客户端中均有使用到MQTT作为消息推送消息。据Cmstop主要负责消息推送的高级研发工程师李文凯称,随着移动互联网的发展,MQTT由于开放源代码,耗电量小等特点,将会在移动消息推送领域会有更多的贡献,在物联网领域,传感器与服务器的通信,信息的收集,MQTT都可以作为考虑的方案之一。

使用方法

导入IBM提供的mqtt.jar包

然后在代码中配置如下。

在其他博客中有详细的配置参见

/article/7870218.html

实现原理

在Android中开启一个Service



public class PushService extends Service
{
// this is the log tag
public static final String      TAG = "PushService";

// the IP address, where your MQTT broker is running.
private static final String     MQTT_HOST = "172.16.26.41";//需要改成服务器IP
// the port at which the broker is running.
private static int              MQTT_BROKER_PORT_NUM      = 1883;//需要改成服务器port
// Let's not use the MQTT persistence.
private static MqttPersistence  MQTT_PERSISTENCE          = null;
// We don't need to remember any state between the connections, so we use a clean start.
private static boolean          MQTT_CLEAN_START          = true;
// Let's set the internal keep alive for MQTT to 15 mins. I haven't tested this value much. It could probably be increased.
private static short            MQTT_KEEP_ALIVE           = 60 * 15;
// Set quality of services to 0 (at most once delivery), since we don't want push notifications
// arrive more than once. However, this means that some messages might get lost (delivery is not guaranteed)
private static int[]            MQTT_QUALITIES_OF_SERVICE = { 0 } ;
private static int              MQTT_QUALITY_OF_SERVICE   = 0;
// The broker should not retain any messages.
private static boolean          MQTT_RETAINED_PUBLISH     = false;

// MQTT client ID, which is given the broker. In this example, I also use this for the topic header.
// You can use this to run push notifications for multiple apps with one MQTT broker.
public static String            MQTT_CLIENT_ID = "ata";//需要改成自己需要的名称

// These are the actions for the service (name are descriptive enough)
private static final String     ACTION_START = MQTT_CLIENT_ID + ".START";
private static final String     ACTION_STOP = MQTT_CLIENT_ID + ".STOP";
private static final String     ACTION_KEEPALIVE = MQTT_CLIENT_ID + ".KEEP_ALIVE";
private static final String     ACTION_RECONNECT = MQTT_CLIENT_ID + ".RECONNECT";

private ConnectivityManager     mConnMan;
private NotificationManager     mNotifMan;
private boolean                 mStarted;
private static final long       KEEP_ALIVE_INTERVAL = 1000 * 60 * 28;

// Retry intervals, when the connection is lost.
private static final long       INITIAL_RETRY_INTERVAL = 1000 * 10;
private static final long       MAXIMUM_RETRY_INTERVAL = 1000 * 60 * 30;

// Preferences instance
private SharedPreferences       mPrefs;
// We store in the preferences, whether or not the service has been started
//判断Service是否已经启动,不要重复启动!
public static final String      PREF_STARTED = "isStarted";
// We also store the deviceID (target)
//需要提供手机设备号,该设备号应该事先保存在SharedPreferences中
public static final String      PREF_DEVICE_ID = "deviceID";
// We store the last retry interval
public static final String      PREF_RETRY = "retryInterval";

// Notification title
public static String            NOTIF_TITLE = "ata"; //需要改成自己需要的title
// Notification id
private static final int        NOTIF_CONNECTED = 0;

// This is the instance of an MQTT connection.
private MQTTConnection          mConnection;
private long                    mStartTime;

// Static method to start the service
//在需要的地方直接调用该方法就启动监听了
public static void actionStart(Context ctx) {
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_START);
ctx.startService(i);
}

// Static method to stop the service
public static void actionStop(Context ctx) {
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_STOP);
ctx.startService(i);
}

// Static method to send a keep alive message
public static void actionPing(Context ctx) {
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_KEEPALIVE);
ctx.startService(i);
}

@Override
public void onCreate() {
super.onCreate();

log("Creating service");
mStartTime = System.currentTimeMillis();

/*try {
//mLog = new ConnectionLog();
//Log.i(TAG, "Opened log at " + mLog.getPath());
} catch (IOException e) {
Log.e(TAG, "Failed to open log", e);
}*/

// Get instances of preferences, connectivity manager and notification manager
mPrefs = getSharedPreferences(TAG, MODE_PRIVATE);
mConnMan = (ConnectivityManager)getSystemService(CONNECTIVITY_SERVICE);
mNotifMan = (NotificationManager)getSystemService(NOTIFICATION_SERVICE);

/* If our process was reaped by the system for any reason we need
* to restore our state with merely a call to onCreate.  We record
* the last "started" value and restore it here if necessary. */
handleCrashedService();
}

// This method does any necessary clean-up need in case the server has been destroyed by the system
// and then restarted
private void handleCrashedService() {
if (wasStarted() == true) {
log("Handling crashed service...");
// stop the keep alives
stopKeepAlives();

// Do a clean start
start();
}
}

@Override
public void onDestroy() {
log("Service destroyed (started=" + mStarted + ")");

// Stop the services, if it has been started
if (mStarted == true) {
stop();
}

/*  try {
if (mLog != null)
mLog.close();
} catch (IOException e) {}      */
}

@Override
public void onStart(Intent intent, int startId) {
super.onStart(intent, startId);
log("Service started with intent=" + intent);

// Do an appropriate action based on the intent.
if (intent.getAction().equals(ACTION_STOP) == true) {
stop();
stopSelf();
} else if (intent.getAction().equals(ACTION_START) == true) {
start();
} else if (intent.getAction().equals(ACTION_KEEPALIVE) == true) {
keepAlive();
} else if (intent.getAction().equals(ACTION_RECONNECT) == true) {
if (isNetworkAvailable()) {
reconnectIfNecessary();
}
}
}

@Override
public IBinder onBind(Intent intent) {
return null;
}

// log helper function
private void log(String message) {
log(message, null);
}
private void log(String message, Throwable e) {
if (e != null) {
Log.e(TAG, message, e);

} else {
Log.i(TAG, message);
}

/*if (mLog != null)
{
try {
mLog.println(message);
} catch (IOException ex) {}
}       */
}

// Reads whether or not the service has been started from the preferences
private boolean wasStarted() {
return mPrefs.getBoolean(PREF_STARTED, false);
}

// Sets whether or not the services has been started in the preferences.
private void setStarted(boolean started) {
mPrefs.edit().putBoolean(PREF_STARTED, started).commit();
mStarted = started;
}

private synchronized void start() {
log("Starting service...");

// Do nothing, if the service is already running.
if (mStarted == true) {
Log.w(TAG, "Attempt to start connection that is already active");
return;
}

// Establish an MQTT connection
connect();

// Register a connectivity listener
registerReceiver(mConnectivityChanged, new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
}

private synchronized void stop() {
// Do nothing, if the service is not running.
if (mStarted == false) {
Log.w(TAG, "Attempt to stop connection not active.");
return;
}

// Save stopped state in the preferences
setStarted(false);

// Remove the connectivity receiver
unregisterReceiver(mConnectivityChanged);
// Any existing reconnect timers should be removed, since we explicitly stopping the service.
cancelReconnect();

// Destroy the MQTT connection if there is one
if (mConnection != null) {
mConnection.disconnect();
mConnection = null;
}
}

//
private synchronized void connect() {
log("Connecting...");
// fetch the device ID from the preferences.
String deviceID = mPrefs.getString(PREF_DEVICE_ID, null);
// Create a new connection only if the device id is not NULL
if (deviceID == null) {
log("Device ID not found.");
} else {
try {
mConnection = new MQTTConnection(MQTT_HOST, deviceID);
} catch (MqttException e) {
// Schedule a reconnect, if we failed to connect
log("MqttException: " + (e.getMessage() != null ? e.getMessage() : "NULL"));
if (isNetworkAvailable()) {
scheduleReconnect(mStartTime);
}
}
setStarted(true);
}
}

private synchronized void keepAlive() {
try {
// Send a keep alive, if there is a connection.
if (mStarted == true && mConnection != null) {
mConnection.sendKeepAlive();
}
} catch (MqttException e) {
log("MqttException: " + (e.getMessage() != null? e.getMessage(): "NULL"), e);

mConnection.disconnect();
mConnection = null;
cancelReconnect();
}
}

// Schedule application level keep-alives using the AlarmManager
private void startKeepAlives() {
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_KEEPALIVE);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager)getSystemService(ALARM_SERVICE);
alarmMgr.setRepeating(AlarmManager.RTC_WAKEUP,
System.currentTimeMillis() + KEEP_ALIVE_INTERVAL,
KEEP_ALIVE_INTERVAL, pi);
}

// Remove all scheduled keep alives
private void stopKeepAlives() {
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_KEEPALIVE);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager)getSystemService(ALARM_SERVICE);
alarmMgr.cancel(pi);
}

// We schedule a reconnect based on the starttime of the service
public void scheduleReconnect(long startTime) {
// the last keep-alive interval
long interval = mPrefs.getLong(PREF_RETRY, INITIAL_RETRY_INTERVAL);

// Calculate the elapsed time since the start
long now = System.currentTimeMillis();
long elapsed = now - startTime;

// Set an appropriate interval based on the elapsed time since start
if (elapsed < interval) {
interval = Math.min(interval * 4, MAXIMUM_RETRY_INTERVAL);
} else {
interval = INITIAL_RETRY_INTERVAL;
}

log("Rescheduling connection in " + interval + "ms.");

// Save the new internval
mPrefs.edit().putLong(PREF_RETRY, interval).commit();

// Schedule a reconnect using the alarm manager.
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_RECONNECT);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager)getSystemService(ALARM_SERVICE);
alarmMgr.set(AlarmManager.RTC_WAKEUP, now + interval, pi);
}

// Remove the scheduled reconnect
public void cancelReconnect() {
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_RECONNECT);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager)getSystemService(ALARM_SERVICE);
alarmMgr.cancel(pi);
}

private synchronized void reconnectIfNecessary() {
if (mStarted == true && mConnection == null) {
log("Reconnecting...");
connect();
}
}

// This receiver listeners for network changes and updates the MQTT connection
// accordingly
private BroadcastReceiver mConnectivityChanged = new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
// Get network info
NetworkInfo info = (NetworkInfo)intent.getParcelableExtra (ConnectivityManager.EXTRA_NETWORK_INFO);

// Is there connectivity?
boolean hasConnectivity = (info != null && info.isConnected()) ? true : false;

log("Connectivity changed: connected=" + hasConnectivity);

if (hasConnectivity) {
reconnectIfNecessary();
} else if (mConnection != null) {
// if there no connectivity, make sure MQTT connection is destroyed
mConnection.disconnect();
cancelReconnect();
mConnection = null;
}
}
};

// Display the topbar notification
private void showNotification(String content) {
Notification n = new Notification();

n.flags |= Notification.FLAG_SHOW_LIGHTS;
n.flags |= Notification.FLAG_AUTO_CANCEL;

n.defaults = Notification.DEFAULT_ALL;

n.icon = R.drawable.ic_dialog_info;
n.when = System.currentTimeMillis();

Log.i("PushService", "json==="+content);
String alert=null;
String id=null;
try {
JSONObject json = new JSONObject(content);
alert = json.optString("alert");
id = json.optString("id");
} catch (JSONException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

Intent intent = new Intent(this,NewMesageInfoActivity.class);
intent.putExtra("url", "http://testing.portal.ataudc.com/message/"+id);
intent.putExtra("id", id);
PendingIntent pi = PendingIntent.getActivity(this, 0,intent, PendingIntent.FLAG_ONE_SHOT);
// Change the name of the notification here
n.setLatestEventInfo(this, NOTIF_TITLE, alert, pi);

mNotifMan.notify(NOTIF_CONNECTED, n);
}

// Check if we are online
private boolean isNetworkAvailable() {
NetworkInfo info = mConnMan.getActiveNetworkInfo();
if (info == null) {
return false;
}
return info.isConnected();
}

// This inner class is a wrapper on top of MQTT client.
private class MQTTConnection implements MqttSimpleCallback {
IMqttClient mqttClient = null;

// Creates a new connection given the broker address and initial topic
public MQTTConnection(String brokerHostName, String initTopic) throws MqttException {
// Create connection spec
String mqttConnSpec = "tcp://" + brokerHostName + "@" + MQTT_BROKER_PORT_NUM;
// Create the client and connect
mqttClient = MqttClient.createMqttClient(mqttConnSpec, MQTT_PERSISTENCE);
String clientID = MQTT_CLIENT_ID + "/" + mPrefs.getString(PREF_DEVICE_ID, "");
mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE);

// register this client app has being able to receive messages
mqttClient.registerSimpleHandler(this);

// Subscribe to an initial topic, which is combination of client ID and device ID.
initTopic = MQTT_CLIENT_ID + "/" + initTopic;
subscribeToTopic(initTopic);

log("Connection established to " + brokerHostName + " on topic " + initTopic);

// Save start time
mStartTime = System.currentTimeMillis();
// Star the keep-alives
startKeepAlives();
}

// Disconnect
public void disconnect() {
try {
stopKeepAlives();
mqttClient.disconnect();
} catch (MqttPersistenceException e) {
log("MqttException" + (e.getMessage() != null? e.getMessage():" NULL"), e);
}
}

private void subscribeToTopic(String topicName) throws MqttException {

if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
// quick sanity check - don't try and subscribe if we don't have
//  a connection
log("Connection error" + "No connection");
} else {
String[] topics = { topicName };
mqttClient.subscribe(topics, MQTT_QUALITIES_OF_SERVICE);
}
}
private void publishToTopic(String topicName, String message) throws MqttException {
if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
// quick sanity check - don't try and publish if we don't have
//  a connection
log("No connection to public to");
} else {
mqttClient.publish(topicName,
message.getBytes(),
MQTT_QUALITY_OF_SERVICE,
MQTT_RETAINED_PUBLISH);
}
}

/*
* Called if the application loses it's connection to the message broker.
*/
public void connectionLost() throws Exception {
log("Loss of connection" + "connection downed");
stopKeepAlives();
// null itself
mConnection = null;
if (isNetworkAvailable() == true) {
reconnectIfNecessary();
}
}

/*
* Called when we receive a message from the message broker.
* 在这里处理服务器推送过来的数据
*/
public void publishArrived(String topicName, byte[] payload, int qos, boolean retained) {
// Show a notification
String s = new String(payload);
showNotification(s);
}

public void sendKeepAlive() throws MqttException {
log("Sending keep alive");
// publish to a keep-alive topic
publishToTopic(MQTT_CLIENT_ID + "/keepalive", mPrefs.getString(PREF_DEVICE_ID, ""));
}
}
}


从上面对比,在android端或者移动设备,mqtt在耗电,耗网上都比较适合,与服务器确认请求,只需要发送几个字节。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: