您的位置:首页 > 移动开发 > Android开发

ActiveMQ+MQTT协议 实现Android推送(根据订阅主题可实现点对点、集群推送)

2016-12-02 17:59 471 查看
   最近功能要做一个推送的功能,在不用第三方推送的前提下,考虑了 MQTT协议实现推送,听说它的效率比RabbitMQ高一些,参考了网上的代码,总结一下,谢了一个demo

服务端我用的是java代码实现的

我的服务器版本是 apache-activemq-5.14.1

public class MqttBroker {

/**
* 发送消息
*
* @param clientId   客户端主题 不是id
* @param messageId
*/
public void sendMessage(String clientId, String message) {

try {
//            	mqttClient = new MqttClient(CONNECTION_STRING);
if (mqttClient == null || !mqttClient.isConnected()) {
connect();
}
System.out.println("send message to  + clientId + , message is "
+ message);
// 发布自己的消息
mqttClient.publish(clientId, message.getBytes(),
0, false);
/*mqttClient.publish(GMCC/tokudu/ + clientId, message.getBytes(),
0, false); */
System.out.println( " #####################" + CLIENT_ID);
} catch (MqttException e) {
e.printStackTrace();
}
}

/**
* 简单回调函数,处理server接收到的主题消息
*
* @author Join
*
*/
class SimpleCallbackHandler implements MqttSimpleCallback {

/**
* 当客户机和broker意外断开时触发 可以再此处理重新订阅
*/
public void connectionLost() throws Exception {
// TODO Auto-generated method stub
System.out.println("客户机和broker已经断开");
}

/** 和broker已
* 客户端订阅消息后,该方法负责回调接收处理消息
*/
public void publishArrived(String topicName, byte[] payload, int Qos,
boolean retained) throws Exception {
// TODO Auto-generated method stub
System.out.println("订阅主题:"  + topicName);
System.out.println("消息数据:"  + new String(payload));
System.out.println("消息级别(0,1,2):"  + Qos);
System.out.println("是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): "
+ retained);
}

}

public static void main(String[] args) {
new MqttBroker().sendMessage("90","this is my message");
}  // 用反斜杠  不要用点
}


 Android客户端代码

package com.mst.v2.activity;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

import com.ibm.mqtt.IMqttClient;
import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttException;
import com.ibm.mqtt.MqttPersistence;
import com.ibm.mqtt.MqttPersistenceException;
import com.ibm.mqtt.MqttSimpleCallback;
import com.mst.v2.bean.MessageItem;
import com.mst.v2.db.MessageDbManager;
import com.mst.v2.util.MessageDailog;
import com.mst.v2.util.MyNotification;

import android.app.AlarmManager;
import android.app.Notification;
import android.app.NotificationManager;
import android.app.PendingIntent;
import android.app.Service;
import android.content.BroadcastReceiver;
import android.content.ComponentName;
import android.content.Context;
import android.content.Intent;
import android.content.IntentFilter;
import android.content.ServiceConnection;
import android.content.SharedPreferences;
import android.media.MediaPlayer;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;
import android.os.Binder;
import android.os.Bundle;
import android.os.Handler;
import android.os.IBinder;
import android.os.Message;
import android.util.Log;
import android.widget.Toast;

import com.mst.v2.R;
import com.thunisoft.mst.shortcutbadger.ShortcutBadger;

/*
* PushService that does all of the work.
* Most of the logic is borrowed from KeepAliveService.
* http://code.google.com/p/android-random/source/browse/trunk/TestKeepAlive/src/org/devtcg/demo/keepalive/KeepAliveService.java?r=219 */
public class PushService extends Service {
// this is the log tag
public static final String TAG = "MQTTPushService";
public static final String BROKER_URL = "tcp://172.16.19.43:1883";
private MyState sta = MyState.getInstance();
// the IP address, where your MQTT broker is running.
private static final String MQTT_HOST = "172.16.19.43";//MyState.getInstance().ucmIp ; ;
// the port at which the broker is running.
private static int MQTT_BROKER_PORT_NUM = 1883;
// Let's not use the MQTT persistence.
private static MqttPersistence MQTT_PERSISTENCE = null;
// We don't need to remember any state between the connections, so we use a
// clean start.
private static boolean MQTT_CLEAN_START = true;
// Let's set the internal keep alive for MQTT to 15 mins. I haven't tested
// this value much. It could probably be increased.
private static short MQTT_KEEP_ALIVE = 5 * 60;// 60 * 15
// Set quality of services to 0 (at most once delivery), since we don't want
// push notifications
// arrive more than once. However, this means that some messages might get
// lost (delivery is not guaranteed)
private static int[] MQTT_QUALITIES_OF_SERVICE = { 0 };
private static int MQTT_QUALITY_OF_SERVICE = 0;
// The broker should not retain any messages.
private static boolean MQTT_RETAINED_PUBLISH = false;

// MQTT client ID, which is given the broker. In this example, I also use
// this for the topic header.
// You can use this to run push notifications for multiple apps with one
// MQTT broker.
public static String MQTT_CLIENT_ID = "mtClient";

// These are the actions for the service (name are descriptive enough)
private static final String ACTION_START = MQTT_CLIENT_ID + ".START";
private static final String ACTION_STOP = MQTT_CLIENT_ID + ".STOP";
private static final String ACTION_KEEPALIVE = MQTT_CLIENT_ID
+ ".KEEP_ALIVE";
private static final String ACTION_RECONNECT = MQTT_CLIENT_ID
+ ".RECONNECT";

// Connection log for the push service. Good for debugging.

// Connectivity manager to determining, when the phone loses connection
private ConnectivityManager mConnMan;
// Notification manager to displaying arrived push notifications
private NotificationManager mNotifMan;

// Whether or not the service has been started.
private boolean mStarted;

// This the application level keep-alive interval, that is used by the
// AlarmManager
// to keep the connection active, even when the device goes to sleep.
private static final long KEEP_ALIVE_INTERVAL = 1000 * 60 * 6;// 1000 * 60 *
// 28

// Retry intervals, when the connection is lost.
private static final long INITIAL_RETRY_INTERVAL = 1000 * 10;
private static final long MAXIMUM_RETRY_INTERVAL = 1000 * 60 * 8;// 1000 *
// 60 *
// 30

// Preferences instance
private SharedPreferences mPrefs;
// We store in the preferences, whether or not the service has been started
public static final String PREF_STARTED = "isStarted";
// We also store the deviceID (target)
public static final String PREF_DEVICE_ID = "deviceID";
// We store the last retry interval
public static final String PREF_RETRY = "retryInterval";

// Notification title
public static String NOTIF_TITLE = "Tokudu";
// Notification id
private static final int NOTIF_CONNECTED = 0;

// This is the instance of an MQTT connection.
private MQTTConnection mConnection;
private long mStartTime;
private Context mContext;
MessageDbManager dbManager;
public static int msgNumber = 0;// 服务器推送消息的数量

static PushService myService;

// Static method to start the service
public static void actionStart(Context ctx) {
Intent i = new Intent(ctx, PushService.class);
// i.set
4000
Flags( Intent.FLAG_ACTIVITY_NEW_TASK );
i.setAction(ACTION_START);
ctx.startService(i);
// ctx.bindService(i, connection, BIND_AUTO_CREATE);
}

// Static method to stop the service
public static void actionStop(Context ctx) {
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_STOP);
ctx.startService(i);
}

// Static method to send a keep alive message
public static void actionPing(Context ctx) {
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_KEEPALIVE);
ctx.startService(i);
}

@Override
public void onCreate() {
super.onCreate();

Log.i(TAG, "onCreate, creating service...");
mStartTime = System.currentTimeMillis();

// Get instances of preferences, connectivity manager and notification
// manager
mPrefs = getSharedPreferences(TAG, MODE_PRIVATE);
mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
mNotifMan = (NotificationManager) getSystemService(NOTIFICATION_SERVICE);

/*
* If our process was reaped by the system for any reason we need to
* restore our state with merely a call to onCreate. We record the last
* "started" value and restore it here if necessary.
*/
handleCrashedService();

mContext = this;
dbManager = new MessageDbManager(mContext);
}

// This method does any necessary clean-up need in case the server has been
// destroyed by the system
// and then restarted
private void handleCrashedService() {
if (wasStarted() == true) {
Log.i(TAG, "Handle crashed service");
// stop the keep alives
stopKeepAlives();
// Do a clean start
startService();
}
}

@Override
public void onDestroy() {
Log.i(TAG, "onDestroy, Service started: " + mStarted);
// Stop the services, if it has been started
if (mStarted == true) {
stop();
}
}

@Override
public void onStart(Intent intent, int startId) {
super.onStart(intent, startId);
Log.i(TAG, "onStart, Service started with intent: " + intent);
if (intent == null) {
Log.e(TAG, "onStart intent = null");
return;
}

// Do an appropriate action based on the intent.
if (intent.getAction().equals(ACTION_STOP) == true) {
stop();
stopSelf();
} else if (intent.getAction().equals(ACTION_START) == true) {
new Thread() {
@Override
public void run() {
// TODO Auto-generated method stub
super.run();
startService();
}
}.start();
} else if (intent.getAction().equals(ACTION_KEEPALIVE) == true) {
keepAlive();
} else if (intent.getAction().equals(ACTION_RECONNECT) == true) {
if (isNetworkAvailable()) {
// if (mConnection == null)
reconnectIfNecessary();
}
}
}

@Override
public IBinder onBind(Intent intent) {
return null;
}

public class MyBinder extends Binder {
PushService getService() {
return PushService.this;
}
}

// Reads whether or not the service has been started from the preferences
private boolean wasStarted() {
return mPrefs.getBoolean(PREF_STARTED, false);
}

// Sets whether or not the services has been started in the preferences.
private void setStarted(boolean started) {
mPrefs.edit().putBoolean(PREF_STARTED, started).commit();
mStarted = started;
}

private synchronized void startService() {
Log.e(TAG, "startService, Starting...");
// Do nothing, if the service is already running.
if (mStarted == true) {
Log.e(TAG, "Attempt to connect MQTT that is already active");
return;
}

// Establish an MQTT connection
connect();

// Register a connectivity listener
// registerReceiver(mConnectivityChanged, new
// IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
IntentFilter filter = new IntentFilter("MESSAGE_ARRIVE_ACTION");
filter.addAction("DELETE_MESSAGE_ACTION");
registerReceiver(mMessageArrive, filter);
}

private synchronized void stop() {
// Do nothing, if the service is not running.
if (mStarted == false) {
Log.e(TAG, "Attempt to stop connection that not active.");
return;
}

// Save stopped state in the preferences
setStarted(false);

// Remove the connectivity receiver
// unregisterReceiver(mConnectivityChanged);
unregisterReceiver(mMessageArrive);
// Any existing reconnect timers should be removed, since we explicitly
// stopping the service.
cancelReconnect();

// Destroy the MQTT connection if there is one
if (mConnection != null) {
mConnection.disconnect();
mConnection = null;
}
}

int index = 1000;

private synchronized void connect() {
Log.i(TAG, "connect");
// fetch the device ID from the preferences.
// Create a new connection only if the device id is not NULL
// TODO Auto-generated method stub
String deviceID = mPrefs.getString(PREF_DEVICE_ID, null);
index++;
try {
Log.i(TAG, "brokerHostName: " + MQTT_HOST + ", initTopic: "
+ deviceID);
mConnection = new MQTTConnection(MQTT_HOST, deviceID);
} catch (MqttException e) {
// Schedule a reconnect, if we failed to connect
Log.e(TAG, "MQTTConnection MqttException "
+ (e.getMessage() != null ? e.getMessage() : "NULL"));
while (!isNetworkAvailable()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
if (isNetworkAvailable()) {
Log.e(TAG, "Network Available, scheduleReconnect");
scheduleReconnect(mStartTime);
} else {
Log.e(TAG, "MqttException Network not Available");
}
}
setStarted(true);
}

private synchronized void keepAlive() {
Log.i(TAG, "keepAlive");
try {
Log.i(TAG, "sendKeepAlive, mStarted: " + mStarted
+ ", mConnection: " + mConnection);
// Send a keep alive, if there is a connection.
if (mStarted == true && mConnection != null) {
mConnection.sendKeepAlive();
}
} catch (MqttException e) {
Log.e(TAG, "sendKeepAlive MqttException: "
+ (e.getMessage() != null ? e.getMessage() : "NULL"));
mConnection.disconnect();
mConnection = null;
cancelReconnect();
}
}

// Schedule application level keep-alives using the AlarmManager

/**
* 启动心跳包闹钟
*/
private void startKeepAlives() {
Log.i(TAG, "startKeepAlives");
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_KEEPALIVE);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.setRepeating(AlarmManager.RTC_WAKEUP,
System.currentTimeMillis() + KEEP_ALIVE_INTERVAL,
KEEP_ALIVE_INTERVAL, pi);
}

// Remove all scheduled keep alives
/**
* 取消已经存在的闹钟
*/
private void stopKeepAlives() {
Log.i(TAG, "stopKeepAlives");
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_KEEPALIVE);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.cancel(pi);
}

// We schedule a reconnect based on the starttime of the service
public void scheduleReconnect(long startTime) {
// the last keep-alive interval
Log.i(TAG, "scheduleReconnect");
long interval = mPrefs.getLong(PREF_RETRY, INITIAL_RETRY_INTERVAL);

// Calculate the elapsed time since the start
long now = System.currentTimeMillis();
long elapsed = now - startTime;

// Set an appropriate interval based on the elapsed time since start
if (elapsed < interval) {
interval = Math.min(interval * 4, MAXIMUM_RETRY_INTERVAL);
} else {
interval = INITIAL_RETRY_INTERVAL;
}

// Save the new internval
Log.i(TAG, "Rescheduling connection interval:  " + interval + "(ms).");
mPrefs.edit().putLong(PREF_RETRY, interval).commit();

// Schedule a reconnect using the alarm manager.
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_RECONNECT);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.set(AlarmManager.RTC_WAKEUP, now + interval, pi);
}

// Remove the scheduled reconnect
public void cancelReconnect() {
Log.i(TAG, "cancelReconnect");
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_RECONNECT);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.cancel(pi);
}

private synchronized void reconnectIfNecessary() {
Log.i(TAG, "reconnectIfNecessary mStarted: " + mStarted
+ ", mConnection: " + mConnection);
if (mStarted == true && mConnection == null) {
new Thread() {
public void run() {
connect();
}
}.start();
}
}

// This receiver listeners for network changes and updates the MQTT
// connection accordingly
private BroadcastReceiver mConnectivityChanged = new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
Log.i(TAG, "mConnectivityChanged onReceive");
// Get network info
NetworkInfo info = (NetworkInfo) intent
.getParcelableExtra(ConnectivityManager.EXTRA_NETWORK_INFO);

boolean isConnected = false;
if (null != info) {
isConnected = info.isConnected();
}
Log.i(TAG, "Connectivity changed, isConnected: " + isConnected);

if (isConnected) {
reconnectIfNecessary();
Log.i(TAG, "reconnectIfNecessary");
} else if (mConnection != null) {
// if there no connectivity, make sure MQTT connection is
// destroyed
mConnection.disconnect();
mConnection = null;
cancelReconnect();
Log.i(TAG, "make MQTT connection destroyed");
}
}
};

public BroadcastReceiver mMessageArrive = new BroadcastReceiver() {
@Override
public void onReceive(Context arg0, Intent arg1) {
Log.i(TAG, "mMessageArrive onReceive");
// TODO Auto-generated method stub
// Toast.makeText(mContext, "1", Toast.LENGTH_SHORT).show();
if (arg1.getAction().equals("MESSAGE_ARRIVE_ACTION")) {
Bundle bundle = arg1.getExtras();
String account = bundle.getString("account");
String time = bundle.getString("time");
String type = bundle.getString("type");
String content = bundle.getString("content");
// Log.i(TAG, "111 account: " + account + ", time: " + time +
// ", type: " + type);
// Log.i(TAG, "222 content: " + content);
if (type.equals("5")) {// 视频
String[] mp4 = content.split(";"); // 多个视频用;隔开(文件名1;文件1url;文件名2;文件2url)
// Log.i(TAG, " 333 lenght: " + mp4.length + ", fileNum: " +
// mp4.length / 2);
for (int i = 0; i < mp4.length; i += 2) {
String mp4Content = mp4[i] + ";" + mp4[i + 1];
// Log.i(TAG, "444 mp4[" + i + "]: " + mp4[i] + "mp4[" +
// (i+1) + "]: " + mp4[i+1]);
// Log.i(TAG, "555 mp4Content: " + mp4Content);
dbManager.addMsg(new MessageItem(time, 1, Integer
.parseInt(type), mp4Content, account));
}

/*
* String[] mp4 = content.split("&&"); // 多个视频用&&隔开 for (int
* i = 0; i < mp4.length; i++) { Log.i("TAG",
* "333  mp4["+i+"]-->"+mp4[i]); dbManager.addMsg(new
* MessageItem(time, 1, Integer.parseInt(type), mp4[i],
* account)); }
*/
} else if (type.equals("1")) {// 图片
String[] jpg = content.split(";");
// Log.i(TAG, "type jpg.length: " + jpg.length);
for (int i = 0; i < jpg.length; i++) {
dbManager.addMsg(new MessageItem(time, 1, Integer
.parseInt(type), jpg[i], account));
}
} else {
dbManager.addMsg(new MessageItem(time, 1, Integer
.parseInt(type), content, account));

}
MainActivity.mMsgHandler.sendEmptyMessage(0);

MyNotification.getInstance(mContext).custom(
dbManager.getMsgNumber(), content);
MediaPlayer mMediaPlayer;
mMediaPlayer = MediaPlayer.create(mContext, R.raw.videorecord);
mMediaPlayer.setLooping(false);
try {
mMediaPlayer.prepare();
} catch (Exception e) {
e.printStackTrace();
}
mMediaPlayer.start();

} else if (arg1.getAction().equals("DELETE_MESSAGE_ACTION")) {
// Toast.makeText(mContext, "2", Toast.LENGTH_SHORT).show();
List<MessageItem> itemList = dbManager.getMsgItem();
for (MessageItem item : itemList) {
try {
// Toast.makeText(mContext, "3",
// Toast.LENGTH_SHORT).show();
if (mConnection != null) {
// Toast.makeText(mContext,
// "发送 time="+item.getmsgTime()+" size="+item.getmsgTime().length(),
// Toast.LENGTH_LONG).show();
mConnection.publishToTopic(
MyState.getInstance().ucmUser + "_cb",
item.getmsgTime() + "");
// Toast.makeText(mContext,
// "发过去了 "+MyState.getInstance().ucmUser+"_cb"+item.getmsgTime(),
// Toast.LENGTH_SHORT).show();
} else {
Toast.makeText(mContext, "连接断开了",
Toast.LENGTH_SHORT).show();
}
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
Log.e("ttt", "kkkkkk bad");
}
}
dbManager.deleteMsg();
}
}
};

// Display the topbar notification
private void showNotification(String text) {
Log.i(TAG, "showNotification");
Notification n = new Notification();

n.flags |= Notification.FLAG_SHOW_LIGHTS;
n.flags |= Notification.FLAG_AUTO_CANCEL;

n.defaults = Notification.DEFAULT_ALL;

n.icon = R.drawable.logo;
n.when = System.currentTimeMillis();

// Simply open the parent activity
PendingIntent pi = PendingIntent.getActivity(this, 0, new Intent(this,
MainActivity.class), 0);

// Change the name of the notification here
n.setLatestEventInfo(this, NOTIF_TITLE, text, pi);

mNotifMan.notify(NOTIF_CONNECTED, n);
}

// Check if we are online
private boolean isNetworkAvailable() {
NetworkInfo info = mConnMan.getActiveNetworkInfo();
if (info != null) {
return info.isConnected();
}
return false;
}

// This inner class is a wrapper on top of MQTT client.
private class MQTTConnection implements MqttSimpleCallback {
IMqttClient mqttClient = null;

// Creates a new connection given the broker address and initial topic
public MQTTConnection(String brokerHostName, String initTopic)
throws MqttException {
// Create connection spec
Log.i(TAG, "brokerHostName: " + brokerHostName + "initTopic: "
+ initTopic);
String mqttConnSpec = "tcp://" + brokerHostName + "@"
+ MQTT_BROKER_PORT_NUM;
// Create the client and connect
mqttClient = new MqttClient(BROKER_URL,MQTT_PERSISTENCE);
//			mqttClient = MqttClient.createMqttClient(mqttConnSpec,
//					MQTT_PERSISTENCE);
String deviceId = mPrefs.getString(PREF_DEVICE_ID, "");
String clientID = MQTT_CLIENT_ID + "/" + deviceId;
Log.i(TAG, "clientID: " + clientID);
String account = MyState.getInstance().ucmUser;
clientID = "MT/" + account + "_" + Integer.toString(index);
Log.i(TAG, "connect, clientID: " + clientID + ", cleanstart: "
+ MQTT_CLEAN_START + ", keepalive: " + MQTT_KEEP_ALIVE);
mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE);

// register this client app has being able to receive messages
mqttClient.registerSimpleHandler(this);

// Subscribe to an initial topic, which is combination of client ID
// and device ID.
initTopic = MyState.getInstance().ucmUser;// + "/+";
//initTopic = clientID + "@" + initTopic;
Log.i(TAG, "subscribeToTopic initTopic: " + initTopic);

/*
* Message msg = new Message(); msg.what = 4; msg.obj = initTopic;
* hander.sendMessage(msg);
*/
subscribeToTopic(initTopic);

// cancelReconnect();
// Save start time
mStartTime = System.currentTimeMillis();
// Star the keep-alives
startKeepAlives();
// sendKeepAlive();
}

// Disconnect
public void disconnect() {
Log.i(TAG, "disconnect");
try {
stopKeepAlives();
mqttClient.disconnect();
mqttClient = null;
} catch (MqttPersistenceException e) {
Log.e(TAG, "disconnect MqttException: "
+ (e.getMessage() != null ? e.getMessage() : " NULL"));
}
}

/*
* Send a request to the message broker to be sent messages published
* with the specified topic name. Wildcards are allowed.
*/
private void subscribeToTopic(String topicName)
c278
throws MqttException {
Log.i(TAG, "subscribeToTopic, topicName:" + topicName);
if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
// quick sanity check - don't try and subscribe if we don't have
// a connection
Log.e(TAG, "subscribeToTopic :Connection error"
+ "No connection");
} else {
String[] topics = { topicName };
int subscribe = mqttClient.subscribe(topics,MQTT_QUALITIES_OF_SERVICE);

Message msg = StatusBarManager.getInstance(mContext).mHandler
.obtainMessage();
msg.what = StatusBarManager.MSG_STATUS_IMAGE_MSG;
msg.arg1 = subscribe;
StatusBarManager.getInstance(mContext).onStatusChange(msg);
Log.e(TAG, "subscribe: " + subscribe + "  topicName:"
+ topicName);
Log.e(TAG, "subscribeToTopic :topic=" + topicName);
}
}

/*
* Sends a message to the message broker, requesting that it be
* published to the specified topic.
*/
public void publishToTopic(String topicName, String message)
throws MqttException {
if ((mqttClient == null) /* || (mqttClient.isConnected() == false) */) {
// quick sanity check - don't try and publish if we don't have
// a connection
Log.e(TAG, "publishToTopic, mqttClient is null");
// connect();
} else {
mqttClient.publish(topicName, message.getBytes(),
MQTT_QUALITY_OF_SERVICE, MQTT_RETAINED_PUBLISH);
Log.i(TAG, "publishToTopic, topicName: " + topicName
+ ", message: " + message);
}
}

/*
* Called if the application loses it's connection to the message
* broker.
*/
public void connectionLost() throws Exception {
Log.e(TAG, "connectionLost" + " connection downed");
stopKeepAlives();
mConnection = null;
new Thread() {
public void run() {
int i = 0;
while (i < 10) {
if (isNetworkAvailable() == true) {
Log.e(TAG,
"network available, reconnectIfNecessary");
reconnectIfNecessary();
break;
} else {
Log.e(TAG, "network not available, sleep");
i++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}.start();
}

/*
* Called when we receive a message from the message broker.
*/
public void publishArrived(String topicName, byte[] payload, int qos,
boolean retained) {
// 如果没在群组内,直接return,只有在群组中才能接收消息
if (!sta.isgrp) {
return;
}
// Show a notification
String msg = new String(payload);
// Toast.makeText(mContext, s, Toast.LENGTH_LONG).show();
// showNotification(s);
int msgNumber = mContext.getSharedPreferences("msg", 0).getInt(
"msgNumber", 0);
msgNumber++;// 每来一次消息推送,消息数量加1
// Log.i(TAG, "msgNumber: " + msgNumber);
SharedPreferences sp = getSharedPreferences("msg", 0);
sp.edit().putInt("msgNumber", msgNumber).commit();

int badgeCount = msgNumber;
ShortcutBadger.applyCount(mContext, badgeCount);
Log.e(TAG, "cpublishArrived topic: " + topicName + ", msg: " + msg);
if (msg.length() > 17) {
String time = msg.substring(0, 16);
// Toast.makeText(mContext,
// "接收 time="+time+" size="+time.length(),
// Toast.LENGTH_LONG).show();
String type = msg.substring(16, 17);
String content = msg.substring(17);
Intent i = new Intent("MESSAGE_ARRIVE_ACTION");
i.putExtra("account", topicName);
i.putExtra("time", time);
i.putExtra("type", type);
i.putExtra("content", content);
mContext.sendBroadcast(i);
// Log.i(TAG, "sendBroadcast over");
}
}

public void sendKeepAlive() throws MqttException {
Log.i(TAG, "Sending keepalive");
// publish to a keep-alive topic
publishToTopic(MQTT_CLIENT_ID + "/keepalive",
mPrefs.getString(PREF_DEVICE_ID, "123"));
}
}
}
 需要请自行下载 .jar都在demo里面,包括java服务端和Android 客户端
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: