您的位置:首页 > 编程语言 > Qt开发

MQTT协议的简单介绍和服务器的安装

2015-03-02 09:36 585 查看
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。由于公司开发的软件里面用到MQTT协议,所以拿来研究一下,相对于XMPP,MQTT更加轻量级,并且占用用户很少的带宽。
MQTT的官网见:http://mqtt.org/。其中http://mqtt.org/software里面提供了官方推荐的各种服务器和客户端使用的各种语言版本的API。

下面以服务器Apollo 1.6为例,之前尝试过使用ActiveMQ,效果很不理想,只能实现服务器和客户端一对一的通信,从官网上了解到Apollo属于activemq的一个子工程。先不管这些了,言归正传,以下在windows环境下。

1、在这里下载Apollo服务器,下载后解压,然后运行apache-apollo-1.6\bin\apollo.cmd,输入create mybroker(名字任意取,这里是根据官网介绍的来取的)创建服务器实例,服务器实例包含了所有的配置,运行时数据等,并且和一个服务器进程关联。

2、create mybroker之后会在bin目录下生成mybroker文件夹,里面包含有很多信息,其中etc\apollo.xml文件下是配置服务器信息的文件,etc\users.properties文件包含连接MQTT服务器时用到的用户名和密码,后面会介绍,可以修改原始的admin=password,可以接着换行添加新的用户名密码。

3、打开cmd,运行…apache-apollo-1.6\bin\mybroker\bin\apollo-broker.cmd run 开启服务器,可以在浏览器中输入http://127.0.0.1:61680/查看是否安装成功,该界面展示了topic,连接数等很多信息。

经过上面的简单步骤,服务器基本上就已经完成,下一篇将介绍Android客户端的编写和注意事项。

客户端使用的API,开始我使用的是mqtt-client,使用过后发现问题百出,不能很好的满足要求,后来使用了官方推荐的Eclipse Paho,下面开始客户端代码的编写,为了方便测试这里有android和j2se两个工程:

1、新建android工程MQTTClient

2、MainActivity代码如下:

[java]
view plaincopyprint?





package ldw.mqttclient;  
  
import java.util.concurrent.Executors;  
import java.util.concurrent.ScheduledExecutorService;  
import java.util.concurrent.TimeUnit;  
  
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;  
import org.eclipse.paho.client.mqttv3.MqttCallback;  
import org.eclipse.paho.client.mqttv3.MqttClient;  
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;  
import org.eclipse.paho.client.mqttv3.MqttException;  
import org.eclipse.paho.client.mqttv3.MqttMessage;  
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;  
  
import android.app.Activity;  
import android.os.Bundle;  
import android.os.Handler;  
import android.os.Message;  
import android.view.KeyEvent;  
import android.widget.TextView;  
import android.widget.Toast;  
  
public class MainActivity extends Activity {  
  
    private TextView resultTv;  
  
    private String host = "tcp://127.0.0.1:1883";  
    private String userName = "admin";  
    private String passWord = "password";  
  
    private Handler handler;  
  
    private MqttClient client;  
  
    private String myTopic = "test/topic";  
  
    private MqttConnectOptions options;  
  
    private ScheduledExecutorService scheduler;  
  
    @Override  
    protected void onCreate(Bundle savedInstanceState) {  
        super.onCreate(savedInstanceState);  
        setContentView(R.layout.main);  
  
        resultTv = (TextView) findViewById(R.id.result);  
  
        init();  
  
        handler = new Handler() {  
            @Override  
            public void handleMessage(Message msg) {  
                super.handleMessage(msg);  
                if(msg.what == 1) {  
                    Toast.makeText(MainActivity.this, (String) msg.obj,  
                            Toast.LENGTH_SHORT).show();  
                    System.out.println("-----------------------------");  
                } else if(msg.what == 2) {  
                    Toast.makeText(MainActivity.this, "连接成功", Toast.LENGTH_SHORT).show();  
                    try {  
                        client.subscribe(myTopic, 1);  
                    } catch (Exception e) {  
                        e.printStackTrace();  
                    }  
                } else if(msg.what == 3) {  
                    Toast.makeText(MainActivity.this, "连接失败,系统正在重连", Toast.LENGTH_SHORT).show();  
                }  
            }  
        };  
  
        startReconnect();  
  
    }  
  
    private void startReconnect() {  
        scheduler = Executors.newSingleThreadScheduledExecutor();  
        scheduler.scheduleAtFixedRate(new Runnable() {  
  
            @Override  
            public void run() {  
                if(!client.isConnected()) {  
                    connect();  
                }  
            }  
        }, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);  
    }  
  
    private void init() {  
        try {  
                       //host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存  
            client = new MqttClient(host, "test",  
                    new MemoryPersistence());  
                       //MQTT的连接设置  
            options = new MqttConnectOptions();  
                       //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接  
            options.setCleanSession(true);  
                       //设置连接的用户名  
            options.setUserName(userName);  
                       //设置连接的密码  
            options.setPassword(passWord.toCharArray());  
            // 设置超时时间 单位为秒  
            options.setConnectionTimeout(10);  
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制  
            options.setKeepAliveInterval(20);  
                        //设置回调  
            client.setCallback(new MqttCallback() {  
  
                @Override  
                public void connectionLost(Throwable cause) {  
                                        //连接丢失后,一般在这里面进行重连  
                    System.out.println("connectionLost----------");  
                }  
  
                @Override  
                public void deliveryComplete(IMqttDeliveryToken token) {  
                                        //publish后会执行到这里  
                    System.out.println("deliveryComplete---------"  
                            + token.isComplete());  
                }  
  
                @Override  
                public void messageArrived(String topicName, MqttMessage message)  
                        throws Exception {  
                                        //subscribe后得到的消息会执行到这里面  
                    System.out.println("messageArrived----------");  
                    Message msg = new Message();  
                    msg.what = 1;  
                    msg.obj = topicName+"---"+message.toString();  
                    handler.sendMessage(msg);  
                }  
            });  
//          connect();  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
  
    private void connect() {  
        new Thread(new Runnable() {  
  
            @Override  
            public void run() {  
                try {  
                    client.connect(options);  
                    Message msg = new Message();  
                    msg.what = 2;  
                    handler.sendMessage(msg);  
                } catch (Exception e) {  
                    e.printStackTrace();  
                    Message msg = new Message();  
                    msg.what = 3;  
                    handler.sendMessage(msg);  
                }  
            }  
        }).start();  
    }  
  
    @Override  
    public boolean onKeyDown(int keyCode, KeyEvent event) {  
        if(client != null && keyCode == KeyEvent.KEYCODE_BACK) {  
            try {  
                client.disconnect();  
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
        }  
        return super.onKeyDown(keyCode, event);  
    }  
  
    @Override  
    protected void onDestroy() {  
        super.onDestroy();  
        try {  
            scheduler.shutdown();  
            client.disconnect();  
        } catch (MqttException e) {  
            e.printStackTrace();  
        }  
    }  
}  

package ldw.mqttclient;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import android.app.Activity;
import android.os.Bundle;
import android.os.Handler;
import android.os.Message;
import android.view.KeyEvent;
import android.widget.TextView;
import android.widget.Toast;

public class MainActivity extends Activity {

private TextView resultTv;

private String host = "tcp://127.0.0.1:1883";
private String userName = "admin";
private String passWord = "password";

private Handler handler;

private MqttClient client;

private String myTopic = "test/topic";

private MqttConnectOptions options;

private ScheduledExecutorService scheduler;

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.main);

resultTv = (TextView) findViewById(R.id.result);

init();

handler = new Handler() {
@Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
if(msg.what == 1) {
Toast.makeText(MainActivity.this, (String) msg.obj,
Toast.LENGTH_SHORT).show();
System.out.println("-----------------------------");
} else if(msg.what == 2) {
Toast.makeText(MainActivity.this, "连接成功", Toast.LENGTH_SHORT).show();
try {
client.subscribe(myTopic, 1);
} catch (Exception e) {
e.printStackTrace();
}
} else if(msg.what == 3) {
Toast.makeText(MainActivity.this, "连接失败,系统正在重连", Toast.LENGTH_SHORT).show();
}
}
};

startReconnect();

}

private void startReconnect() {
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
if(!client.isConnected()) {
connect();
}
}
}, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
}

private void init() {
try {
//host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttClient(host, "test",
new MemoryPersistence());
//MQTT的连接设置
options = new MqttConnectOptions();
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
//设置连接的用户名
options.setUserName(userName);
//设置连接的密码
options.setPassword(passWord.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
//设置回调
client.setCallback(new MqttCallback() {

@Override
public void connectionLost(Throwable cause) {
//连接丢失后,一般在这里面进行重连
System.out.println("connectionLost----------");
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//publish后会执行到这里
System.out.println("deliveryComplete---------"
+ token.isComplete());
}

@Override
public void messageArrived(String topicName, MqttMessage message)
throws Exception {
//subscribe后得到的消息会执行到这里面
System.out.println("messageArrived----------");
Message msg = new Message();
msg.what = 1;
msg.obj = topicName+"---"+message.toString();
handler.sendMessage(msg);
}
});
//			connect();
} catch (Exception e) {
e.printStackTrace();
}
}

private void connect() {
new Thread(new Runnable() {

@Override
public void run() {
try {
client.connect(options);
Message msg = new Message();
msg.what = 2;
handler.sendMessage(msg);
} catch (Exception e) {
e.printStackTrace();
Message msg = new Message();
msg.what = 3;
handler.sendMessage(msg);
}
}
}).start();
}

@Override
public boolean onKeyDown(int keyCode, KeyEvent event) {
if(client != null && keyCode == KeyEvent.KEYCODE_BACK) {
try {
client.disconnect();
} catch (Exception e) {
e.printStackTrace();
}
}
return super.onKeyDown(keyCode, event);
}

@Override
protected void onDestroy() {
super.onDestroy();
try {
scheduler.shutdown();
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
}


由于项目需要,我用到了心跳重连。根据这里的解释设置apollo.xml,主要有设置主机连接的地址。另外,options还有个setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。

3、新建j2se工程MQTTServer

4、Server代码如下:

[java]
view plaincopyprint?





import java.awt.Container;  
import java.awt.event.ActionEvent;  
import java.awt.event.ActionListener;  
  
import javax.swing.JButton;  
import javax.swing.JFrame;  
import javax.swing.JPanel;  
  
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;  
import org.eclipse.paho.client.mqttv3.MqttCallback;  
import org.eclipse.paho.client.mqttv3.MqttClient;  
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;  
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;  
import org.eclipse.paho.client.mqttv3.MqttMessage;  
import org.eclipse.paho.client.mqttv3.MqttTopic;  
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;  
  
public class Server extends JFrame {  
    private static final long serialVersionUID = 1L;  
    private JPanel panel;  
    private JButton button;  
  
    private MqttClient client;  
    private String host = "tcp://127.0.0.1:1883";  
//  private String host = "tcp://localhost:1883";  
    private String userName = "test";  
    private String passWord = "test";  
    private MqttTopic topic;  
    private MqttMessage message;  
  
    private String myTopic = "test/topic";  
  
    public Server() {  
  
        try {  
            client = new MqttClient(host, "Server",  
                    new MemoryPersistence());  
            connect();  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
  
        Container container = this.getContentPane();  
        panel = new JPanel();  
        button = new JButton("发布话题");  
        button.addActionListener(new ActionListener() {  
  
            @Override  
            public void actionPerformed(ActionEvent ae) {  
                try {  
                    MqttDeliveryToken token = topic.publish(message);  
                    token.waitForCompletion();  
                    System.out.println(token.isComplete()+"========");  
                } catch (Exception e) {  
                    e.printStackTrace();  
                }  
            }  
        });  
        panel.add(button);  
        container.add(panel, "North");  
  
    }  
  
    private void connect() {  
  
        MqttConnectOptions options = new MqttConnectOptions();  
        options.setCleanSession(false);  
        options.setUserName(userName);  
        options.setPassword(passWord.toCharArray());  
        // 设置超时时间  
        options.setConnectionTimeout(10);  
        // 设置会话心跳时间  
        options.setKeepAliveInterval(20);  
        try {  
            client.setCallback(new MqttCallback() {  
  
                @Override  
                public void connectionLost(Throwable cause) {  
                    System.out.println("connectionLost-----------");  
                }  
  
                @Override  
                public void deliveryComplete(IMqttDeliveryToken token) {  
                    System.out.println("deliveryComplete---------"+token.isComplete());  
                }  
  
                @Override  
                public void messageArrived(String topic, MqttMessage arg1)  
                        throws Exception {  
                    System.out.println("messageArrived----------");  
  
                }  
            });  
  
            topic = client.getTopic(myTopic);  
  
            message = new MqttMessage();  
            message.setQos(1);  
            message.setRetained(true);  
            System.out.println(message.isRetained()+"------ratained状态");  
            message.setPayload("eeeeeaaaaaawwwwww---".getBytes());  
  
            client.connect(options);  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
  
    }  
  
    public static void main(String[] args) {  
        Server s = new Server();  
        s.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);  
        s.setSize(600, 370);  
        s.setLocationRelativeTo(null);  
        s.setVisible(true);  
    }  
}  

import java.awt.Container;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;

import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class Server extends JFrame {
private static final long serialVersionUID = 1L;
private JPanel panel;
private JButton button;

private MqttClient client;
private String host = "tcp://127.0.0.1:1883";
//	private String host = "tcp://localhost:1883";
private String userName = "test";
private String passWord = "test";
private MqttTopic topic;
private MqttMessage message;

private String myTopic = "test/topic";

public Server() {

try {
client = new MqttClient(host, "Server",
new MemoryPersistence());
connect();
} catch (Exception e) {
e.printStackTrace();
}

Container container = this.getContentPane();
panel = new JPanel();
button = new JButton("发布话题");
button.addActionListener(new ActionListener() {

@Override
public void actionPerformed(ActionEvent ae) {
try {
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
System.out.println(token.isComplete()+"========");
} catch (Exception e) {
e.printStackTrace();
}
}
});
panel.add(button);
container.add(panel, "North");

}

private void connect() {

MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
// 设置超时时间
options.setConnectionTimeout(10);
// 设置会话心跳时间
options.setKeepAliveInterval(20);
try {
client.setCallback(new MqttCallback() {

@Override
public void connectionLost(Throwable cause) {
System.out.println("connectionLost-----------");
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------"+token.isComplete());
}

@Override
public void messageArrived(String topic, MqttMessage arg1)
throws Exception {
System.out.println("messageArrived----------");

}
});

topic = client.getTopic(myTopic);

message = new MqttMessage();
message.setQos(1);
message.setRetained(true);
System.out.println(message.isRetained()+"------ratained状态");
message.setPayload("eeeeeaaaaaawwwwww---".getBytes());

client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}

}

public static void main(String[] args) {
Server s = new Server();
s.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
s.setSize(600, 370);
s.setLocationRelativeTo(null);
s.setVisible(true);
}
}


上面代码跟客户端的代码差不多,这里就不做解释了。

没什么好说的,MQTT就是这么简单,但开始在使用的时候要注意一些参数的设置来适应项目的需求。

jar包下载地址:
https://repo.eclipse.org/content/repositories/paho/org/eclipse/paho/mqtt-client/0.4.0/
转自:http://www.longdw.com/mqtt-server-client-android/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: