您的位置:首页 > 编程语言 > Qt开发

MQTT的学习研究(七)基于HTTP POST MQTT 发布消息服务端使用

2015-07-01 17:07 1091 查看
参阅官方文档

http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/topic/com.ibm.mq.csqzau.doc/ts21220_.htm

HTTP POST puts a message to a queue, or a publication to a topic. The HTTPPOST Java sample is an example an HTTP POST request of a message to a queue. Instead of using Java, you could create an HTTPPOST request using a browser form, or an AJAX toolkit instead.

Figure 1 shows an HTTP request to put a message on a queue called myQueue. This request contains the HTTP header x-msg-correlId to set the correlation ID of the WebSphere MQ message.

Figure 1. Example of an HTTP POST request to a queue
POST /msg/queue/myQueue/ HTTP/1.1
Host: www.example.org
Content-Type: text/plain
x-msg-correlID: 1234567890
Content-Length: 50

Here's my message body that will appear on the queue.


Figure 2 shows the response sent back to the client. There is no response content.

Figure 2. Example of an HTTP POST response
HTTP/1.1 200 OK
Date: Wed, 2 Jan 2007 22:38:34 GMT
Server: Apache-Coyote/1.1 WMQ-HTTP/1.1 JEE-Bridge/1.1
Content-Length: 0


请求的协议格式和请求的响应格式

The HTTP POST operation puts a message on a WebSphere® MQ queue, or publishes a message to a topic.

Syntax

Request

>>-POST-- --| Path |-- --HTTP version--CRLF--------------------->

.-CRLF---------------.  .-CRLF---------------.
V                    |  V                    |
>----+----------------+-+----+----------------+-+--------------->
'-general-header-'      '-request-header-'

.-CRLF----------------------------.        .-CRLF----.
V                                 |        V         |
>----+-----------------------------+-+--CRLF----Message-+------><
'-| entity header (Request) |-'

Path

|--/--contextRoot--/-------------------------------------------->

>--msg/--+-queue/--queueName--+-------------+-+--/--------------|
|                    '-@--qMgrName-' |
'-topic/--topicName------------------'

entity-header (Request)

|--+----------------------------------------------+-------------|
+-standard entity-header-- --entity-value------+
+-x-msg-class-- --message type-----------------+
+-x-msg-correlId-- --correlation ID------------+
+-x-msg-encoding-- --encoding type-------------+
+-x-msg-expiry-- --duration--------------------+
+-x-msg-format-- --message format--------------+
+-x-msg-msgId-- --message ID-------------------+
+-x-msg-persistence-- --persistence------------+
+-x-msg-priority-- --priority class------------+
+-x-msg-replyTo-- --reply-to queue-------------+
+-x-msg-require-headers-- --entity header name-+
'-x-msg-usr-- --user properties----------------'


Note:
If a question mark (?) is used it must be substituted with %3f. For example, orange?topic should be specified as orange%3ftopic.

@qMgrName is only valid on an HTTP POST

Response

>>-HTTP version-- --HTTP Status-Code-- --HTTP Reason-Phrase--CRLF-->

.-CRLF---------------.  .-CRLF----------------.
V                    |  V                     |
>----+----------------+-+----+-----------------+-+-------------->
'-general-header-'      '-response-header-'

.-CRLF-----------------------------.
V                                  |
>----+------------------------------+-+------------------------><
'-| entity-header (Response) |-'

entity-header (Response)

|--+-----------------------------------------+------------------|
+-standard entity-header-- --entity-value-+
+-x-msg-class-- --message type------------+
+-x-msg-correlId-- --correlation ID-------+
+-x-msg-encoding-- --encoding type--------+
+-x-msg-expiry-- --duration---------------+
+-x-msg-format-- --message format---------+
+-x-msg-msgId-- --message ID--------------+
+-x-msg-persistence-- --persistence-------+
+-x-msg-priority-- --priority class-------+
+-x-msg-replyTo-- --reply-to queue--------+
+-x-msg-timestamp-- --HTTP-date-----------+
'-x-msg-usr-- --user properties-----------'


HTTP POST方式实现如下:

Java代码


package com.etrip.mqttv3.http;

/**

* This sample shows how to post a message. It has the same behaviour as the

* amqsput command in that it will read in lines from the command line and put

* them to the queue. It will put non-persistent String messages on to the queue

* with UNLIMITED expiry and LOW (0) priority. The program is terminated by

* either EOF being put into the entry line (^Z on windows) or a blank line.

* usage: java HTTPPOST <Queue (default=SYSTEM.DEFAULT.LOCAL.QUEUE)> <host:port

* (default localhost:8080> <context-root (the MQ Bridge for HTTP's

* context-root)>

*/

import java.io.BufferedReader;

import java.io.BufferedWriter;

import java.io.IOException;

import java.io.InputStreamReader;

import java.io.OutputStream;

import java.io.OutputStreamWriter;

import java.net.HttpURLConnection;

import java.net.MalformedURLException;

import java.net.URL;

/**

*

* 采用HTTP POST发布相关的消息

* The HTTP POST operation puts a message on a WebSphere® MQ queue, or publishes

* a message to a topic.

*

* 发布消息到主题或者队列的路径:

*

*

*

*

*

* @author longgangbai

*/

public class HTTPPOST

{

private static final String DEFAULT_HOST = "localhost";

private static final String DEFAULT_PORT = "8080";

private static final String DEFAULT_QUEUE = "SYSTEM.DEFAULT.LOCAL.QUEUE";

private static final String DEFAULT_CONTEXT_ROOT = "mq";

private static final String CRLF = "\r\n";

public static int MALFORMED_URL_EXCEPTION_RC = -1;

public static int END_IOEXCEPTION_RC = -2;

/**

* 构建发布主题队列路径

*

* @param host

* @param port

* @param context

* @param queueName

*/

private static String getPublishQueueURL(String host, String port,

String context, String queueName) {

StringBuffer urlString =new StringBuffer("http://");

if(StringUtils.isEmtry(host)){

host=DEFAULT_HOST;

}

if(StringUtils.isEmtry(port)){

port=DEFAULT_PORT;

}

urlString.append(host).append(":").append(port);

if(StringUtils.isEmtry(context)){

context=DEFAULT_CONTEXT_ROOT;

}

urlString.append("/");

urlString.append(context);

urlString.append("/msg/queue/");

if(StringUtils.isEmtry(queueName)){

}

queueName=DEFAULT_QUEUE;

urlString.append(queueName);

System.out.println("urlString="+urlString);

return urlString.toString();

}

/**

*

* @param host

* @param port

* @param context

* @param queueName

* @param message

* @return

* @throws MalformedURLException

*/

public static boolean publishTopic(String host,String port,String context,String queueName,String message ){

boolean response = true;

HttpURLConnection connection=null;

try {

String publishURL=getPublishQueueURL(host, port, context, queueName);

URL url=new URL(publishURL);

connection = (HttpURLConnection) url.openConnection();

/* Build the headers */

// the verb first

connection.setRequestMethod("POST");

// Content type is a string message

connection.setRequestProperty("content-type", "text/plain");

// set the message priority to low

connection.setRequestProperty("x-msg-priority", "LOW");

// Ensure we can get the output stream from the connection

connection.setDoOutput(true);

OutputStream outputStream = connection.getOutputStream();

// wrapper the outputstream in a writer

BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(

outputStream));

// Now write the actual content.

// Make sure the CRLF is there in case some HTTP servers don't understand

// that it's the end of the message

writer.write(message + CRLF + CRLF);

writer.flush();

// now actually send the message

connection.connect();

// check the response for errors

int responseCode = connection.getResponseCode();

if (responseCode != 200)

{

String responseMessage =connection.getResponseMessage();

System.out.println("responsere sponseCode "+responseCode+" response request ="+responseMessage);

System.out.println("responsere context ");

BufferedReader reader = new BufferedReader(new InputStreamReader(

connection.getErrorStream()));

String line = null;

while ((line = reader.readLine()) != null)

{

System.out.println(line);

}

connection.disconnect();

response = false;

}else{

//获取相应的消息头信息

String responseQueueName=connection.getHeaderField("x-msg-replyTo");

System.out.println("responseQueueName="+responseQueueName);

System.out.println("response successful context :"+connection.getResponseMessage());

}

} catch (MalformedURLException e) {

response = false;

e.printStackTrace();

// TODO: handle exception

} catch (IOException e) {

response = false;

// TODO Auto-generated catch block

e.printStackTrace();

}finally{

connection.disconnect();

}

return response;

}

public static void main(String[] args) {

HTTPPOST.publishTopic("192.168.208.46", "8080", "mq", "java_lover", "this is a message ");

}

}

HTTP POST 发布主题请求协议和响应协议
http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/topic/com.ibm.mq.csqzau.doc/ts21220_.htm
请求响应头各个字段的含义的讲解
http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/topic/com.ibm.mq.csqzau.doc/ts21250_.htm

响应错误处理
http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/topic/com.ibm.mq.csqzau.doc/ts21340_.htm
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: