您的位置:首页 > 编程语言 > PHP开发

HTPPSQS的学习总结

2016-09-12 18:53 399 查看
HTTPSQS(HTTP Simple Queue Service)是一款基于 HTTP GET/POST 协议的轻量级开源简单消息队列服务,使用 Tokyo Cabinet 的 B+Tree Key/Value 数据库来做数据的持久化存储。

特征:

HTTPSQS 具有以下特征:

● 非常简单,基于 HTTP GET/POST 协议。PHP、Java、Perl、Shell、Python、Ruby等支持HTTP协议的编程语言均可调用。

● 非常快速,入队列、出队列速度超过10000次/秒。

● 高并发,支持上万的并发连接,C10K不成问题。

● 支持多队列。

● 单个队列支持的最大队列数量高达10亿条。

● 低内存消耗,海量数据存储,存储几十GB的数据只需不到100MB的物理内存缓冲区。

● 可以在不停止服务的情况下便捷地修改单个队列的最大队列数量。

● 可以实时查看队列状态(入队列位置、出队列位置、未读队列数量、最大队列数量)。

● 可以查看指定队列ID(队列点)的内容,包括未出、已出的队列内容。

● 查看队列内容时,支持多字符集编码。

● 源代码不超过700行,整个软件不超过900KB,适合二次开发。

命令使用

开启制定端口号的httpsqs服务(默认端口号是1218):

httpsqs -d -p 9801 -x /opt/queue/httpsqs.db -a gwwrewsXssdf2

-a 代表需要使用密钥链接

-x 代表数据存放位置

-d 代表守护进程挂起

查询队列状态情况(普通方式):

curl “http://127.0.0.1:9801/?name=queueName&opt=status&auth=gwwrewsXssdf2

查询队列状态情况(json方式):

curl “http://127.0.0.1:9801/?name=queueName&opt=status_json&auth=gwwrewsXssdf2

入队:

get方式:

curl “http://127.0.0.1:9801/?name=queueName&opt=put&data=经过URL编码的文本消息&auth=gwwrewsXssdf2”

post方式:

curl -d “经过URL编码的文本消息” “http://127.0.0.1:9801/?name=queueName&opt=put&auth=gwwrewsXssdf2

出队:

curl “http://127.0.0.1:9801/?charset=utf-8&name=queueName&opt=get&auth=gwwrewsXssdf2

查看指定队列位置点的内容:

curl “http://127.0.0.1:9801/?charset=utf-8&name=queueName&opt=view&pos=5&auth=gwwrewsXssdf2

重置队列:

curl “http://127.0.0.1:9801/?charset=utf-8&name=queueName&opt=reset&auth=gwwrewsXssdf2

更改指定队列的最大数量:

curl “http://127.0.0.1:9801/?charset=utf-8&name=queueName&opt=maxqueue&num=1000000000&auth=gwwrewsXssdf2

代码使用

基类

/**
*
* Httpsqs4j基础类,用于设置连接信息及创建客户端对象
*
*/
public class Httpsqs4j {

protected static String prefix;

protected static String charset;

public static boolean configured = false;

/**
* 设置连接信息
*
* @param ip
* @param port
* @param charset
*            字符集
* @param auth
* @throws HttpsqsException
*/
public static void setConnectionInfo(String ip, int port, String charset,
String auth) throws HttpsqsException {
try {
"".getBytes(charset);
} catch (UnsupportedEncodingException e) {
throw new HttpsqsException("Unknown charset.", (Throwable) e);
}
URL url;
HttpURLConnection connection = null;
String prefix = "http://" + ip + ":" + port + "/";
try {
url = new URL(prefix);
connection = (HttpURLConnection) url.openConnection();
connection.connect();
} catch (IOException e) {
throw new HttpsqsException(prefix + " cannot located.",
(Throwable) e);
} finally {
if (connection != null) {
connection.disconnect();
}
}
Httpsqs4j.prefix = prefix + "?auth=" + auth + "&";
Httpsqs4j.charset = charset;
Httpsqs4j.configured = true;
}

/**
* 创建新的客户端对象
*
* @return
* @throws HttpsqsException
*/
public static HttpsqsClient createNewClient() throws HttpsqsException {
if (Httpsqs4j.configured == false) {
Properties properties;
try {
properties = loadProperties();
Httpsqs4j.setConnectionInfo(properties.getProperty("httpsqs.ip"),
Integer.parseInt(properties.getProperty("httpsqs.port")),
properties.getProperty("httpsqs.charset"), properties.getProperty("httpsqs.auth"));
} catch (IOException e) {
throw new HttpsqsException("加载配置文件异常");
}
}
return new HttpsqsClient();
}

private static Properties loadProperties() throws IOException {
Properties properties = new Properties();
properties.load(JedisPoolInstance.class.getClassLoader().getResourceAsStream("config.properties"));
return properties;
}
}


客户端类

/**
*
* 客户端类,使用Httpsqs4j类的createNewClient()方法创建,
* 创建前请先调用Httpsqs4j的setConnectionInfo设置连接信息
*
*/
public class HttpsqsClient {

private long pos;

protected HttpsqsClient() {

}

private String httpPost(String urlStr, String postData)
throws HttpsqsException {
return this.getSource(urlStr, postData);
}

private String httpGet(String urlStr) throws HttpsqsException {
return this.getSource(urlStr, null);
}

private String getSource(String urlStr, String postData)
throws HttpsqsException {
HttpURLConnection connection = null;
InputStream is = null;
InputStreamReader isr = null;
BufferedReader reader = null;
OutputStream os = null;
OutputStreamWriter osw = null;
StringBuffer sb = new StringBuffer();
try {
URL url = new URL(Httpsqs4j.prefix + urlStr);
connection = (HttpURLConnection) url.openConnection();
if (postData != null) {
try {
connection.setDoOutput(true);
os = connection.getOutputStream();
osw = new OutputStreamWriter(os);
osw.write(postData);
osw.flush();

da70
} catch (IOException e) {
throw new HttpsqsException("Send data error.",
(Throwable) e);
} finally {
if (osw != null) {
osw.close();
}
if (os != null) {
os.close();
}
}
}
is = connection.getInputStream();
isr = new InputStreamReader(is, Httpsqs4j.charset);
reader = new BufferedReader(isr);
String line = "";
while ((line = reader.readLine()) != null) {
sb.append(line).append('\n');
}
String pos = connection.getHeaderField("Pos");
if (pos != null) {
this.pos = Long.valueOf(pos);
}
} catch (IOException e) {
throw new HttpsqsException("Cannot connect to server.",
(Throwable) e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (isr != null) {
try {
isr.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (is != null) {
try {
is.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (connection != null) {
connection.disconnect();
}
}
String sbs = sb.toString();
if (sbs.contains("HTTPSQS_ERROR")) {
throw new HttpsqsException("Global error.");
}
return sb.toString();
}

/**
* 获取最后一次出入队列操作的位置值
*
* @return
*/
public long getLastPos() {
return this.pos;
}

/**
* 获取HttpSQS状态
*
* @param queueName
*            队列名称
* @return
* @throws HttpsqsException
*/
public HttpsqsStatus getStatus(String queueName) throws HttpsqsException {
String urlStr = "opt=status&name=" + queueName;
String source = this.httpGet(urlStr);
Matcher matcher = HttpsqsStatus.pattern.matcher(source);
if (matcher.find()) {
HttpsqsStatus status = new HttpsqsStatus();
status.version = matcher.group(1);
status.queueName = matcher.group(2);
status.maxNumber = Long.parseLong(matcher.group(3));
status.getLap = Long.parseLong(matcher.group(4));
status.getPosition = Long.parseLong(matcher.group(5));
status.putLap = Long.parseLong(matcher.group(6));
status.putPosition = Long.parseLong(matcher.group(7));
status.unreadNumber = Long.parseLong(matcher.group(8));
return status;
}
return null;
}

/**
* 将字符串加入队列
*
* @param queueName
*            队列名称
* @param str
*            字符串
* @throws HttpsqsException
*/
public void putString(String queueName, String str) throws HttpsqsException {
String urlStr = "opt=put&name=" + queueName;
String source = this.httpPost(urlStr, str);
if (source.contains("HTTPSQS_PUT_END")) {
throw new HttpsqsException("Queue [" + queueName + "] fulled.");
} else if (source.contains("HTTPSQS_PUT_ERROR")) {
throw new HttpsqsException("Put data to queue [" + queueName
+ "] failed.");
}
}

/**
* 将字符串出队列
*
* @param queueName
*            队列名称
* @return
* @throws HttpsqsException
*/
public String getString(String queueName) throws HttpsqsException {
String urlStr = "opt=get&charset=" + Httpsqs4j.charset + "&name="
+ queueName;
String source = this.httpGet(urlStr);
// if (source.contains("HTTPSQS_GET_END")) {
// throw new HttpsqsException("There's no data in queue [" + queueName +
// "].");
// }
return source;
}

/**
* 获取某位置的字符串
*
* @param queueName
*            队列名称
* @param pos
*            位置
* @return
* @throws HttpsqsException
*/
public String getStringAt(String queueName, long pos)
throws HttpsqsException {
if (pos < 1 || pos > 1000000000l) {
throw new HttpsqsException("Pos' out of range[1 - 1000000000].");
}
String urlStr = "opt=view&charset=" + Httpsqs4j.charset + "&name="
+ queueName + "&pos=" + pos;
return this.httpGet(urlStr);
}

/**
* 重置队列
*
* @param queueName
*            队列名称
* @return
* @throws HttpsqsException
*/
public boolean reset(String queueName) throws HttpsqsException {
String urlStr = "opt=reset&name=" + queueName;
String source = this.httpGet(urlStr);
return source.contains("HTTPSQS_RESET_OK");
}

/**
* 设置最大队列数量
*
* @param queueName
*            队列名称
* @param number
*            最大数量
* @return
* @throws HttpsqsException
*/
public boolean setMaxNumber(String queueName, long number)
throws HttpsqsException {
if (pos < 10 || pos > 1000000000l) {
throw new HttpsqsException("Pos' out of range[10 - 1000000000].");
}
String urlStr = "opt=maxqueue&name=" + queueName + "&num=" + number;
String source = this.httpGet(urlStr);
return source.contains("HTTPSQS_MAXQUEUE_OK");
}

}


状态标识类

public class HttpsqsStatus {

/**
* HttpSQS版本
*/
public String version;

/**
* 队列名称
*/
public String queueName;

/**
* 队列最大数量
*/
public long maxNumber;

/**
* 当前入队位置
*/
public long putPosition;

/**
* 当前入队圈数
*/
public long putLap;

/**
* 当前出队位置
*/
public long getPosition;

/**
* 当前出队圈数
*/
public long getLap;

/**
* 当前未出队数量
*/
public long unreadNumber;

protected static Pattern pattern = Pattern
.compile("HTTP Simple Queue Service v(.+?)\\s(?:.+?)\\sQueue Name: (.+?)\\sMaximum number of queues: (\\d+)\\sPut position of queue \\((\\d+)st lap\\): (\\d+)\\sGet position of queue \\((\\d+)st lap\\): (\\d+)\\sNumber of unread queue: (\\d+)");

}


自定义异常类

public class HttpsqsException extends Exception {

private static final long serialVersionUID = 1L;

public HttpsqsException() {
super();
}

public HttpsqsException(String message) {
super(message);
}

public HttpsqsException(String message, Throwable cause) {
super(message, cause);
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java httpsqs 队列