ActiveMQ+MQTT协议 实现Android推送(根据订阅主题可实现点对点、集群推送)
2016-12-02 17:59
471 查看
最近功能要做一个推送的功能,在不用第三方推送的前提下,考虑了 MQTT协议实现推送,听说它的效率比RabbitMQ高一些,参考了网上的代码,总结一下,谢了一个demo
服务端我用的是java代码实现的
我的服务器版本是 apache-activemq-5.14.1
Android客户端代码
服务端我用的是java代码实现的
我的服务器版本是 apache-activemq-5.14.1
public class MqttBroker { /** * 发送消息 * * @param clientId 客户端主题 不是id * @param messageId */ public void sendMessage(String clientId, String message) { try { // mqttClient = new MqttClient(CONNECTION_STRING); if (mqttClient == null || !mqttClient.isConnected()) { connect(); } System.out.println("send message to + clientId + , message is " + message); // 发布自己的消息 mqttClient.publish(clientId, message.getBytes(), 0, false); /*mqttClient.publish(GMCC/tokudu/ + clientId, message.getBytes(), 0, false); */ System.out.println( " #####################" + CLIENT_ID); } catch (MqttException e) { e.printStackTrace(); } } /** * 简单回调函数,处理server接收到的主题消息 * * @author Join * */ class SimpleCallbackHandler implements MqttSimpleCallback { /** * 当客户机和broker意外断开时触发 可以再此处理重新订阅 */ public void connectionLost() throws Exception { // TODO Auto-generated method stub System.out.println("客户机和broker已经断开"); } /** 和broker已 * 客户端订阅消息后,该方法负责回调接收处理消息 */ public void publishArrived(String topicName, byte[] payload, int Qos, boolean retained) throws Exception { // TODO Auto-generated method stub System.out.println("订阅主题:" + topicName); System.out.println("消息数据:" + new String(payload)); System.out.println("消息级别(0,1,2):" + Qos); System.out.println("是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): " + retained); } } public static void main(String[] args) { new MqttBroker().sendMessage("90","this is my message"); } // 用反斜杠 不要用点 }
Android客户端代码
package com.mst.v2.activity; import java.io.IOException; import java.util.Arrays; import java.util.List; import com.ibm.mqtt.IMqttClient; import com.ibm.mqtt.MqttClient; import com.ibm.mqtt.MqttException; import com.ibm.mqtt.MqttPersistence; import com.ibm.mqtt.MqttPersistenceException; import com.ibm.mqtt.MqttSimpleCallback; import com.mst.v2.bean.MessageItem; import com.mst.v2.db.MessageDbManager; import com.mst.v2.util.MessageDailog; import com.mst.v2.util.MyNotification; import android.app.AlarmManager; import android.app.Notification; import android.app.NotificationManager; import android.app.PendingIntent; import android.app.Service; import android.content.BroadcastReceiver; import android.content.ComponentName; import android.content.Context; import android.content.Intent; import android.content.IntentFilter; import android.content.ServiceConnection; import android.content.SharedPreferences; import android.media.MediaPlayer; import android.net.ConnectivityManager; import android.net.NetworkInfo; import android.os.Binder; import android.os.Bundle; import android.os.Handler; import android.os.IBinder; import android.os.Message; import android.util.Log; import android.widget.Toast; import com.mst.v2.R; import com.thunisoft.mst.shortcutbadger.ShortcutBadger; /* * PushService that does all of the work. * Most of the logic is borrowed from KeepAliveService. * http://code.google.com/p/android-random/source/browse/trunk/TestKeepAlive/src/org/devtcg/demo/keepalive/KeepAliveService.java?r=219 */ public class PushService extends Service { // this is the log tag public static final String TAG = "MQTTPushService"; public static final String BROKER_URL = "tcp://172.16.19.43:1883"; private MyState sta = MyState.getInstance(); // the IP address, where your MQTT broker is running. private static final String MQTT_HOST = "172.16.19.43";//MyState.getInstance().ucmIp ; ; // the port at which the broker is running. private static int MQTT_BROKER_PORT_NUM = 1883; // Let's not use the MQTT persistence. private static MqttPersistence MQTT_PERSISTENCE = null; // We don't need to remember any state between the connections, so we use a // clean start. private static boolean MQTT_CLEAN_START = true; // Let's set the internal keep alive for MQTT to 15 mins. I haven't tested // this value much. It could probably be increased. private static short MQTT_KEEP_ALIVE = 5 * 60;// 60 * 15 // Set quality of services to 0 (at most once delivery), since we don't want // push notifications // arrive more than once. However, this means that some messages might get // lost (delivery is not guaranteed) private static int[] MQTT_QUALITIES_OF_SERVICE = { 0 }; private static int MQTT_QUALITY_OF_SERVICE = 0; // The broker should not retain any messages. private static boolean MQTT_RETAINED_PUBLISH = false; // MQTT client ID, which is given the broker. In this example, I also use // this for the topic header. // You can use this to run push notifications for multiple apps with one // MQTT broker. public static String MQTT_CLIENT_ID = "mtClient"; // These are the actions for the service (name are descriptive enough) private static final String ACTION_START = MQTT_CLIENT_ID + ".START"; private static final String ACTION_STOP = MQTT_CLIENT_ID + ".STOP"; private static final String ACTION_KEEPALIVE = MQTT_CLIENT_ID + ".KEEP_ALIVE"; private static final String ACTION_RECONNECT = MQTT_CLIENT_ID + ".RECONNECT"; // Connection log for the push service. Good for debugging. // Connectivity manager to determining, when the phone loses connection private ConnectivityManager mConnMan; // Notification manager to displaying arrived push notifications private NotificationManager mNotifMan; // Whether or not the service has been started. private boolean mStarted; // This the application level keep-alive interval, that is used by the // AlarmManager // to keep the connection active, even when the device goes to sleep. private static final long KEEP_ALIVE_INTERVAL = 1000 * 60 * 6;// 1000 * 60 * // 28 // Retry intervals, when the connection is lost. private static final long INITIAL_RETRY_INTERVAL = 1000 * 10; private static final long MAXIMUM_RETRY_INTERVAL = 1000 * 60 * 8;// 1000 * // 60 * // 30 // Preferences instance private SharedPreferences mPrefs; // We store in the preferences, whether or not the service has been started public static final String PREF_STARTED = "isStarted"; // We also store the deviceID (target) public static final String PREF_DEVICE_ID = "deviceID"; // We store the last retry interval public static final String PREF_RETRY = "retryInterval"; // Notification title public static String NOTIF_TITLE = "Tokudu"; // Notification id private static final int NOTIF_CONNECTED = 0; // This is the instance of an MQTT connection. private MQTTConnection mConnection; private long mStartTime; private Context mContext; MessageDbManager dbManager; public static int msgNumber = 0;// 服务器推送消息的数量 static PushService myService; // Static method to start the service public static void actionStart(Context ctx) { Intent i = new Intent(ctx, PushService.class); // i.set 4000 Flags( Intent.FLAG_ACTIVITY_NEW_TASK ); i.setAction(ACTION_START); ctx.startService(i); // ctx.bindService(i, connection, BIND_AUTO_CREATE); } // Static method to stop the service public static void actionStop(Context ctx) { Intent i = new Intent(ctx, PushService.class); i.setAction(ACTION_STOP); ctx.startService(i); } // Static method to send a keep alive message public static void actionPing(Context ctx) { Intent i = new Intent(ctx, PushService.class); i.setAction(ACTION_KEEPALIVE); ctx.startService(i); } @Override public void onCreate() { super.onCreate(); Log.i(TAG, "onCreate, creating service..."); mStartTime = System.currentTimeMillis(); // Get instances of preferences, connectivity manager and notification // manager mPrefs = getSharedPreferences(TAG, MODE_PRIVATE); mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE); mNotifMan = (NotificationManager) getSystemService(NOTIFICATION_SERVICE); /* * If our process was reaped by the system for any reason we need to * restore our state with merely a call to onCreate. We record the last * "started" value and restore it here if necessary. */ handleCrashedService(); mContext = this; dbManager = new MessageDbManager(mContext); } // This method does any necessary clean-up need in case the server has been // destroyed by the system // and then restarted private void handleCrashedService() { if (wasStarted() == true) { Log.i(TAG, "Handle crashed service"); // stop the keep alives stopKeepAlives(); // Do a clean start startService(); } } @Override public void onDestroy() { Log.i(TAG, "onDestroy, Service started: " + mStarted); // Stop the services, if it has been started if (mStarted == true) { stop(); } } @Override public void onStart(Intent intent, int startId) { super.onStart(intent, startId); Log.i(TAG, "onStart, Service started with intent: " + intent); if (intent == null) { Log.e(TAG, "onStart intent = null"); return; } // Do an appropriate action based on the intent. if (intent.getAction().equals(ACTION_STOP) == true) { stop(); stopSelf(); } else if (intent.getAction().equals(ACTION_START) == true) { new Thread() { @Override public void run() { // TODO Auto-generated method stub super.run(); startService(); } }.start(); } else if (intent.getAction().equals(ACTION_KEEPALIVE) == true) { keepAlive(); } else if (intent.getAction().equals(ACTION_RECONNECT) == true) { if (isNetworkAvailable()) { // if (mConnection == null) reconnectIfNecessary(); } } } @Override public IBinder onBind(Intent intent) { return null; } public class MyBinder extends Binder { PushService getService() { return PushService.this; } } // Reads whether or not the service has been started from the preferences private boolean wasStarted() { return mPrefs.getBoolean(PREF_STARTED, false); } // Sets whether or not the services has been started in the preferences. private void setStarted(boolean started) { mPrefs.edit().putBoolean(PREF_STARTED, started).commit(); mStarted = started; } private synchronized void startService() { Log.e(TAG, "startService, Starting..."); // Do nothing, if the service is already running. if (mStarted == true) { Log.e(TAG, "Attempt to connect MQTT that is already active"); return; } // Establish an MQTT connection connect(); // Register a connectivity listener // registerReceiver(mConnectivityChanged, new // IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION)); IntentFilter filter = new IntentFilter("MESSAGE_ARRIVE_ACTION"); filter.addAction("DELETE_MESSAGE_ACTION"); registerReceiver(mMessageArrive, filter); } private synchronized void stop() { // Do nothing, if the service is not running. if (mStarted == false) { Log.e(TAG, "Attempt to stop connection that not active."); return; } // Save stopped state in the preferences setStarted(false); // Remove the connectivity receiver // unregisterReceiver(mConnectivityChanged); unregisterReceiver(mMessageArrive); // Any existing reconnect timers should be removed, since we explicitly // stopping the service. cancelReconnect(); // Destroy the MQTT connection if there is one if (mConnection != null) { mConnection.disconnect(); mConnection = null; } } int index = 1000; private synchronized void connect() { Log.i(TAG, "connect"); // fetch the device ID from the preferences. // Create a new connection only if the device id is not NULL // TODO Auto-generated method stub String deviceID = mPrefs.getString(PREF_DEVICE_ID, null); index++; try { Log.i(TAG, "brokerHostName: " + MQTT_HOST + ", initTopic: " + deviceID); mConnection = new MQTTConnection(MQTT_HOST, deviceID); } catch (MqttException e) { // Schedule a reconnect, if we failed to connect Log.e(TAG, "MQTTConnection MqttException " + (e.getMessage() != null ? e.getMessage() : "NULL")); while (!isNetworkAvailable()) { try { Thread.sleep(1000); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } if (isNetworkAvailable()) { Log.e(TAG, "Network Available, scheduleReconnect"); scheduleReconnect(mStartTime); } else { Log.e(TAG, "MqttException Network not Available"); } } setStarted(true); } private synchronized void keepAlive() { Log.i(TAG, "keepAlive"); try { Log.i(TAG, "sendKeepAlive, mStarted: " + mStarted + ", mConnection: " + mConnection); // Send a keep alive, if there is a connection. if (mStarted == true && mConnection != null) { mConnection.sendKeepAlive(); } } catch (MqttException e) { Log.e(TAG, "sendKeepAlive MqttException: " + (e.getMessage() != null ? e.getMessage() : "NULL")); mConnection.disconnect(); mConnection = null; cancelReconnect(); } } // Schedule application level keep-alives using the AlarmManager /** * 启动心跳包闹钟 */ private void startKeepAlives() { Log.i(TAG, "startKeepAlives"); Intent i = new Intent(); i.setClass(this, PushService.class); i.setAction(ACTION_KEEPALIVE); PendingIntent pi = PendingIntent.getService(this, 0, i, 0); AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE); alarmMgr.setRepeating(AlarmManager.RTC_WAKEUP, System.currentTimeMillis() + KEEP_ALIVE_INTERVAL, KEEP_ALIVE_INTERVAL, pi); } // Remove all scheduled keep alives /** * 取消已经存在的闹钟 */ private void stopKeepAlives() { Log.i(TAG, "stopKeepAlives"); Intent i = new Intent(); i.setClass(this, PushService.class); i.setAction(ACTION_KEEPALIVE); PendingIntent pi = PendingIntent.getService(this, 0, i, 0); AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE); alarmMgr.cancel(pi); } // We schedule a reconnect based on the starttime of the service public void scheduleReconnect(long startTime) { // the last keep-alive interval Log.i(TAG, "scheduleReconnect"); long interval = mPrefs.getLong(PREF_RETRY, INITIAL_RETRY_INTERVAL); // Calculate the elapsed time since the start long now = System.currentTimeMillis(); long elapsed = now - startTime; // Set an appropriate interval based on the elapsed time since start if (elapsed < interval) { interval = Math.min(interval * 4, MAXIMUM_RETRY_INTERVAL); } else { interval = INITIAL_RETRY_INTERVAL; } // Save the new internval Log.i(TAG, "Rescheduling connection interval: " + interval + "(ms)."); mPrefs.edit().putLong(PREF_RETRY, interval).commit(); // Schedule a reconnect using the alarm manager. Intent i = new Intent(); i.setClass(this, PushService.class); i.setAction(ACTION_RECONNECT); PendingIntent pi = PendingIntent.getService(this, 0, i, 0); AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE); alarmMgr.set(AlarmManager.RTC_WAKEUP, now + interval, pi); } // Remove the scheduled reconnect public void cancelReconnect() { Log.i(TAG, "cancelReconnect"); Intent i = new Intent(); i.setClass(this, PushService.class); i.setAction(ACTION_RECONNECT); PendingIntent pi = PendingIntent.getService(this, 0, i, 0); AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE); alarmMgr.cancel(pi); } private synchronized void reconnectIfNecessary() { Log.i(TAG, "reconnectIfNecessary mStarted: " + mStarted + ", mConnection: " + mConnection); if (mStarted == true && mConnection == null) { new Thread() { public void run() { connect(); } }.start(); } } // This receiver listeners for network changes and updates the MQTT // connection accordingly private BroadcastReceiver mConnectivityChanged = new BroadcastReceiver() { @Override public void onReceive(Context context, Intent intent) { Log.i(TAG, "mConnectivityChanged onReceive"); // Get network info NetworkInfo info = (NetworkInfo) intent .getParcelableExtra(ConnectivityManager.EXTRA_NETWORK_INFO); boolean isConnected = false; if (null != info) { isConnected = info.isConnected(); } Log.i(TAG, "Connectivity changed, isConnected: " + isConnected); if (isConnected) { reconnectIfNecessary(); Log.i(TAG, "reconnectIfNecessary"); } else if (mConnection != null) { // if there no connectivity, make sure MQTT connection is // destroyed mConnection.disconnect(); mConnection = null; cancelReconnect(); Log.i(TAG, "make MQTT connection destroyed"); } } }; public BroadcastReceiver mMessageArrive = new BroadcastReceiver() { @Override public void onReceive(Context arg0, Intent arg1) { Log.i(TAG, "mMessageArrive onReceive"); // TODO Auto-generated method stub // Toast.makeText(mContext, "1", Toast.LENGTH_SHORT).show(); if (arg1.getAction().equals("MESSAGE_ARRIVE_ACTION")) { Bundle bundle = arg1.getExtras(); String account = bundle.getString("account"); String time = bundle.getString("time"); String type = bundle.getString("type"); String content = bundle.getString("content"); // Log.i(TAG, "111 account: " + account + ", time: " + time + // ", type: " + type); // Log.i(TAG, "222 content: " + content); if (type.equals("5")) {// 视频 String[] mp4 = content.split(";"); // 多个视频用;隔开(文件名1;文件1url;文件名2;文件2url) // Log.i(TAG, " 333 lenght: " + mp4.length + ", fileNum: " + // mp4.length / 2); for (int i = 0; i < mp4.length; i += 2) { String mp4Content = mp4[i] + ";" + mp4[i + 1]; // Log.i(TAG, "444 mp4[" + i + "]: " + mp4[i] + "mp4[" + // (i+1) + "]: " + mp4[i+1]); // Log.i(TAG, "555 mp4Content: " + mp4Content); dbManager.addMsg(new MessageItem(time, 1, Integer .parseInt(type), mp4Content, account)); } /* * String[] mp4 = content.split("&&"); // 多个视频用&&隔开 for (int * i = 0; i < mp4.length; i++) { Log.i("TAG", * "333 mp4["+i+"]-->"+mp4[i]); dbManager.addMsg(new * MessageItem(time, 1, Integer.parseInt(type), mp4[i], * account)); } */ } else if (type.equals("1")) {// 图片 String[] jpg = content.split(";"); // Log.i(TAG, "type jpg.length: " + jpg.length); for (int i = 0; i < jpg.length; i++) { dbManager.addMsg(new MessageItem(time, 1, Integer .parseInt(type), jpg[i], account)); } } else { dbManager.addMsg(new MessageItem(time, 1, Integer .parseInt(type), content, account)); } MainActivity.mMsgHandler.sendEmptyMessage(0); MyNotification.getInstance(mContext).custom( dbManager.getMsgNumber(), content); MediaPlayer mMediaPlayer; mMediaPlayer = MediaPlayer.create(mContext, R.raw.videorecord); mMediaPlayer.setLooping(false); try { mMediaPlayer.prepare(); } catch (Exception e) { e.printStackTrace(); } mMediaPlayer.start(); } else if (arg1.getAction().equals("DELETE_MESSAGE_ACTION")) { // Toast.makeText(mContext, "2", Toast.LENGTH_SHORT).show(); List<MessageItem> itemList = dbManager.getMsgItem(); for (MessageItem item : itemList) { try { // Toast.makeText(mContext, "3", // Toast.LENGTH_SHORT).show(); if (mConnection != null) { // Toast.makeText(mContext, // "发送 time="+item.getmsgTime()+" size="+item.getmsgTime().length(), // Toast.LENGTH_LONG).show(); mConnection.publishToTopic( MyState.getInstance().ucmUser + "_cb", item.getmsgTime() + ""); // Toast.makeText(mContext, // "发过去了 "+MyState.getInstance().ucmUser+"_cb"+item.getmsgTime(), // Toast.LENGTH_SHORT).show(); } else { Toast.makeText(mContext, "连接断开了", Toast.LENGTH_SHORT).show(); } } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); Log.e("ttt", "kkkkkk bad"); } } dbManager.deleteMsg(); } } }; // Display the topbar notification private void showNotification(String text) { Log.i(TAG, "showNotification"); Notification n = new Notification(); n.flags |= Notification.FLAG_SHOW_LIGHTS; n.flags |= Notification.FLAG_AUTO_CANCEL; n.defaults = Notification.DEFAULT_ALL; n.icon = R.drawable.logo; n.when = System.currentTimeMillis(); // Simply open the parent activity PendingIntent pi = PendingIntent.getActivity(this, 0, new Intent(this, MainActivity.class), 0); // Change the name of the notification here n.setLatestEventInfo(this, NOTIF_TITLE, text, pi); mNotifMan.notify(NOTIF_CONNECTED, n); } // Check if we are online private boolean isNetworkAvailable() { NetworkInfo info = mConnMan.getActiveNetworkInfo(); if (info != null) { return info.isConnected(); } return false; } // This inner class is a wrapper on top of MQTT client. private class MQTTConnection implements MqttSimpleCallback { IMqttClient mqttClient = null; // Creates a new connection given the broker address and initial topic public MQTTConnection(String brokerHostName, String initTopic) throws MqttException { // Create connection spec Log.i(TAG, "brokerHostName: " + brokerHostName + "initTopic: " + initTopic); String mqttConnSpec = "tcp://" + brokerHostName + "@" + MQTT_BROKER_PORT_NUM; // Create the client and connect mqttClient = new MqttClient(BROKER_URL,MQTT_PERSISTENCE); // mqttClient = MqttClient.createMqttClient(mqttConnSpec, // MQTT_PERSISTENCE); String deviceId = mPrefs.getString(PREF_DEVICE_ID, ""); String clientID = MQTT_CLIENT_ID + "/" + deviceId; Log.i(TAG, "clientID: " + clientID); String account = MyState.getInstance().ucmUser; clientID = "MT/" + account + "_" + Integer.toString(index); Log.i(TAG, "connect, clientID: " + clientID + ", cleanstart: " + MQTT_CLEAN_START + ", keepalive: " + MQTT_KEEP_ALIVE); mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE); // register this client app has being able to receive messages mqttClient.registerSimpleHandler(this); // Subscribe to an initial topic, which is combination of client ID // and device ID. initTopic = MyState.getInstance().ucmUser;// + "/+"; //initTopic = clientID + "@" + initTopic; Log.i(TAG, "subscribeToTopic initTopic: " + initTopic); /* * Message msg = new Message(); msg.what = 4; msg.obj = initTopic; * hander.sendMessage(msg); */ subscribeToTopic(initTopic); // cancelReconnect(); // Save start time mStartTime = System.currentTimeMillis(); // Star the keep-alives startKeepAlives(); // sendKeepAlive(); } // Disconnect public void disconnect() { Log.i(TAG, "disconnect"); try { stopKeepAlives(); mqttClient.disconnect(); mqttClient = null; } catch (MqttPersistenceException e) { Log.e(TAG, "disconnect MqttException: " + (e.getMessage() != null ? e.getMessage() : " NULL")); } } /* * Send a request to the message broker to be sent messages published * with the specified topic name. Wildcards are allowed. */ private void subscribeToTopic(String topicName) c278 throws MqttException { Log.i(TAG, "subscribeToTopic, topicName:" + topicName); if ((mqttClient == null) || (mqttClient.isConnected() == false)) { // quick sanity check - don't try and subscribe if we don't have // a connection Log.e(TAG, "subscribeToTopic :Connection error" + "No connection"); } else { String[] topics = { topicName }; int subscribe = mqttClient.subscribe(topics,MQTT_QUALITIES_OF_SERVICE); Message msg = StatusBarManager.getInstance(mContext).mHandler .obtainMessage(); msg.what = StatusBarManager.MSG_STATUS_IMAGE_MSG; msg.arg1 = subscribe; StatusBarManager.getInstance(mContext).onStatusChange(msg); Log.e(TAG, "subscribe: " + subscribe + " topicName:" + topicName); Log.e(TAG, "subscribeToTopic :topic=" + topicName); } } /* * Sends a message to the message broker, requesting that it be * published to the specified topic. */ public void publishToTopic(String topicName, String message) throws MqttException { if ((mqttClient == null) /* || (mqttClient.isConnected() == false) */) { // quick sanity check - don't try and publish if we don't have // a connection Log.e(TAG, "publishToTopic, mqttClient is null"); // connect(); } else { mqttClient.publish(topicName, message.getBytes(), MQTT_QUALITY_OF_SERVICE, MQTT_RETAINED_PUBLISH); Log.i(TAG, "publishToTopic, topicName: " + topicName + ", message: " + message); } } /* * Called if the application loses it's connection to the message * broker. */ public void connectionLost() throws Exception { Log.e(TAG, "connectionLost" + " connection downed"); stopKeepAlives(); mConnection = null; new Thread() { public void run() { int i = 0; while (i < 10) { if (isNetworkAvailable() == true) { Log.e(TAG, "network available, reconnectIfNecessary"); reconnectIfNecessary(); break; } else { Log.e(TAG, "network not available, sleep"); i++; try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }.start(); } /* * Called when we receive a message from the message broker. */ public void publishArrived(String topicName, byte[] payload, int qos, boolean retained) { // 如果没在群组内,直接return,只有在群组中才能接收消息 if (!sta.isgrp) { return; } // Show a notification String msg = new String(payload); // Toast.makeText(mContext, s, Toast.LENGTH_LONG).show(); // showNotification(s); int msgNumber = mContext.getSharedPreferences("msg", 0).getInt( "msgNumber", 0); msgNumber++;// 每来一次消息推送,消息数量加1 // Log.i(TAG, "msgNumber: " + msgNumber); SharedPreferences sp = getSharedPreferences("msg", 0); sp.edit().putInt("msgNumber", msgNumber).commit(); int badgeCount = msgNumber; ShortcutBadger.applyCount(mContext, badgeCount); Log.e(TAG, "cpublishArrived topic: " + topicName + ", msg: " + msg); if (msg.length() > 17) { String time = msg.substring(0, 16); // Toast.makeText(mContext, // "接收 time="+time+" size="+time.length(), // Toast.LENGTH_LONG).show(); String type = msg.substring(16, 17); String content = msg.substring(17); Intent i = new Intent("MESSAGE_ARRIVE_ACTION"); i.putExtra("account", topicName); i.putExtra("time", time); i.putExtra("type", type); i.putExtra("content", content); mContext.sendBroadcast(i); // Log.i(TAG, "sendBroadcast over"); } } public void sendKeepAlive() throws MqttException { Log.i(TAG, "Sending keepalive"); // publish to a keep-alive topic publishToTopic(MQTT_CLIENT_ID + "/keepalive", mPrefs.getString(PREF_DEVICE_ID, "123")); } } }需要请自行下载 .jar都在demo里面,包括java服务端和Android 客户端
相关文章推荐
- 用ActiveMQ+MQTT实现Android点对点消息通知
- 使用ActiveMQ+MQTT实现Android点对点消息通知
- Android消息推送(二)--基于MQTT协议实现的推送功能
- 如何采用MQTT协议实现android消息推送
- 使用ActiveMQ+MQTT实现Android点对点消息通知
- 采用MQTT协议实现Android消息推送
- 采用MQTT协议实现Android推送
- Android推送通知的实现--采用MQTT协议实现Android消息推送
- 用MQTT协议实现android消息推送
- Android消息推送(二)--基于MQTT协议实现的推送功能
- 采用MQTT协议实现android消息推送
- 采用MQTT协议实现Android消息推送
- 采用MQTT协议实现Android消息推送
- 采用MQTT协议实现Android消息推送
- 采用MQTT协议实现Android消息推送
- Android消息推送(二)--基于MQTT协议实现的推送功能
- 采用MQTT协议实现Android推送
- 采用MQTT协议实现Android消息推送
- 采用MQTT协议实现Android消息推送
- Android推送通知的实现--采用MQTT协议实现Android消息推送