HTPPSQS的学习总结
2016-09-12 18:53
399 查看
HTTPSQS(HTTP Simple Queue Service)是一款基于 HTTP GET/POST 协议的轻量级开源简单消息队列服务,使用 Tokyo Cabinet 的 B+Tree Key/Value 数据库来做数据的持久化存储。
● 非常简单,基于 HTTP GET/POST 协议。PHP、Java、Perl、Shell、Python、Ruby等支持HTTP协议的编程语言均可调用。
● 非常快速,入队列、出队列速度超过10000次/秒。
● 高并发,支持上万的并发连接,C10K不成问题。
● 支持多队列。
● 单个队列支持的最大队列数量高达10亿条。
● 低内存消耗,海量数据存储,存储几十GB的数据只需不到100MB的物理内存缓冲区。
● 可以在不停止服务的情况下便捷地修改单个队列的最大队列数量。
● 可以实时查看队列状态(入队列位置、出队列位置、未读队列数量、最大队列数量)。
● 可以查看指定队列ID(队列点)的内容,包括未出、已出的队列内容。
● 查看队列内容时,支持多字符集编码。
● 源代码不超过700行,整个软件不超过900KB,适合二次开发。
httpsqs -d -p 9801 -x /opt/queue/httpsqs.db -a gwwrewsXssdf2
-a 代表需要使用密钥链接
-x 代表数据存放位置
-d 代表守护进程挂起
查询队列状态情况(普通方式):
curl “http://127.0.0.1:9801/?name=queueName&opt=status&auth=gwwrewsXssdf2”
查询队列状态情况(json方式):
curl “http://127.0.0.1:9801/?name=queueName&opt=status_json&auth=gwwrewsXssdf2”
入队:
get方式:
curl “http://127.0.0.1:9801/?name=queueName&opt=put&data=经过URL编码的文本消息&auth=gwwrewsXssdf2”
post方式:
curl -d “经过URL编码的文本消息” “http://127.0.0.1:9801/?name=queueName&opt=put&auth=gwwrewsXssdf2”
出队:
curl “http://127.0.0.1:9801/?charset=utf-8&name=queueName&opt=get&auth=gwwrewsXssdf2”
查看指定队列位置点的内容:
curl “http://127.0.0.1:9801/?charset=utf-8&name=queueName&opt=view&pos=5&auth=gwwrewsXssdf2”
重置队列:
curl “http://127.0.0.1:9801/?charset=utf-8&name=queueName&opt=reset&auth=gwwrewsXssdf2”
更改指定队列的最大数量:
curl “http://127.0.0.1:9801/?charset=utf-8&name=queueName&opt=maxqueue&num=1000000000&auth=gwwrewsXssdf2”
特征:
HTTPSQS 具有以下特征:● 非常简单,基于 HTTP GET/POST 协议。PHP、Java、Perl、Shell、Python、Ruby等支持HTTP协议的编程语言均可调用。
● 非常快速,入队列、出队列速度超过10000次/秒。
● 高并发,支持上万的并发连接,C10K不成问题。
● 支持多队列。
● 单个队列支持的最大队列数量高达10亿条。
● 低内存消耗,海量数据存储,存储几十GB的数据只需不到100MB的物理内存缓冲区。
● 可以在不停止服务的情况下便捷地修改单个队列的最大队列数量。
● 可以实时查看队列状态(入队列位置、出队列位置、未读队列数量、最大队列数量)。
● 可以查看指定队列ID(队列点)的内容,包括未出、已出的队列内容。
● 查看队列内容时,支持多字符集编码。
● 源代码不超过700行,整个软件不超过900KB,适合二次开发。
命令使用
开启制定端口号的httpsqs服务(默认端口号是1218):httpsqs -d -p 9801 -x /opt/queue/httpsqs.db -a gwwrewsXssdf2
-a 代表需要使用密钥链接
-x 代表数据存放位置
-d 代表守护进程挂起
查询队列状态情况(普通方式):
curl “http://127.0.0.1:9801/?name=queueName&opt=status&auth=gwwrewsXssdf2”
查询队列状态情况(json方式):
curl “http://127.0.0.1:9801/?name=queueName&opt=status_json&auth=gwwrewsXssdf2”
入队:
get方式:
curl “http://127.0.0.1:9801/?name=queueName&opt=put&data=经过URL编码的文本消息&auth=gwwrewsXssdf2”
post方式:
curl -d “经过URL编码的文本消息” “http://127.0.0.1:9801/?name=queueName&opt=put&auth=gwwrewsXssdf2”
出队:
curl “http://127.0.0.1:9801/?charset=utf-8&name=queueName&opt=get&auth=gwwrewsXssdf2”
查看指定队列位置点的内容:
curl “http://127.0.0.1:9801/?charset=utf-8&name=queueName&opt=view&pos=5&auth=gwwrewsXssdf2”
重置队列:
curl “http://127.0.0.1:9801/?charset=utf-8&name=queueName&opt=reset&auth=gwwrewsXssdf2”
更改指定队列的最大数量:
curl “http://127.0.0.1:9801/?charset=utf-8&name=queueName&opt=maxqueue&num=1000000000&auth=gwwrewsXssdf2”
代码使用
基类
/** * * Httpsqs4j基础类,用于设置连接信息及创建客户端对象 * */ public class Httpsqs4j { protected static String prefix; protected static String charset; public static boolean configured = false; /** * 设置连接信息 * * @param ip * @param port * @param charset * 字符集 * @param auth * @throws HttpsqsException */ public static void setConnectionInfo(String ip, int port, String charset, String auth) throws HttpsqsException { try { "".getBytes(charset); } catch (UnsupportedEncodingException e) { throw new HttpsqsException("Unknown charset.", (Throwable) e); } URL url; HttpURLConnection connection = null; String prefix = "http://" + ip + ":" + port + "/"; try { url = new URL(prefix); connection = (HttpURLConnection) url.openConnection(); connection.connect(); } catch (IOException e) { throw new HttpsqsException(prefix + " cannot located.", (Throwable) e); } finally { if (connection != null) { connection.disconnect(); } } Httpsqs4j.prefix = prefix + "?auth=" + auth + "&"; Httpsqs4j.charset = charset; Httpsqs4j.configured = true; } /** * 创建新的客户端对象 * * @return * @throws HttpsqsException */ public static HttpsqsClient createNewClient() throws HttpsqsException { if (Httpsqs4j.configured == false) { Properties properties; try { properties = loadProperties(); Httpsqs4j.setConnectionInfo(properties.getProperty("httpsqs.ip"), Integer.parseInt(properties.getProperty("httpsqs.port")), properties.getProperty("httpsqs.charset"), properties.getProperty("httpsqs.auth")); } catch (IOException e) { throw new HttpsqsException("加载配置文件异常"); } } return new HttpsqsClient(); } private static Properties loadProperties() throws IOException { Properties properties = new Properties(); properties.load(JedisPoolInstance.class.getClassLoader().getResourceAsStream("config.properties")); return properties; } }
客户端类
/** * * 客户端类,使用Httpsqs4j类的createNewClient()方法创建, * 创建前请先调用Httpsqs4j的setConnectionInfo设置连接信息 * */ public class HttpsqsClient { private long pos; protected HttpsqsClient() { } private String httpPost(String urlStr, String postData) throws HttpsqsException { return this.getSource(urlStr, postData); } private String httpGet(String urlStr) throws HttpsqsException { return this.getSource(urlStr, null); } private String getSource(String urlStr, String postData) throws HttpsqsException { HttpURLConnection connection = null; InputStream is = null; InputStreamReader isr = null; BufferedReader reader = null; OutputStream os = null; OutputStreamWriter osw = null; StringBuffer sb = new StringBuffer(); try { URL url = new URL(Httpsqs4j.prefix + urlStr); connection = (HttpURLConnection) url.openConnection(); if (postData != null) { try { connection.setDoOutput(true); os = connection.getOutputStream(); osw = new OutputStreamWriter(os); osw.write(postData); osw.flush(); da70 } catch (IOException e) { throw new HttpsqsException("Send data error.", (Throwable) e); } finally { if (osw != null) { osw.close(); } if (os != null) { os.close(); } } } is = connection.getInputStream(); isr = new InputStreamReader(is, Httpsqs4j.charset); reader = new BufferedReader(isr); String line = ""; while ((line = reader.readLine()) != null) { sb.append(line).append('\n'); } String pos = connection.getHeaderField("Pos"); if (pos != null) { this.pos = Long.valueOf(pos); } } catch (IOException e) { throw new HttpsqsException("Cannot connect to server.", (Throwable) e); } finally { if (reader != null) { try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } if (isr != null) { try { isr.close(); } catch (IOException e) { e.printStackTrace(); } } if (is != null) { try { is.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection != null) { connection.disconnect(); } } String sbs = sb.toString(); if (sbs.contains("HTTPSQS_ERROR")) { throw new HttpsqsException("Global error."); } return sb.toString(); } /** * 获取最后一次出入队列操作的位置值 * * @return */ public long getLastPos() { return this.pos; } /** * 获取HttpSQS状态 * * @param queueName * 队列名称 * @return * @throws HttpsqsException */ public HttpsqsStatus getStatus(String queueName) throws HttpsqsException { String urlStr = "opt=status&name=" + queueName; String source = this.httpGet(urlStr); Matcher matcher = HttpsqsStatus.pattern.matcher(source); if (matcher.find()) { HttpsqsStatus status = new HttpsqsStatus(); status.version = matcher.group(1); status.queueName = matcher.group(2); status.maxNumber = Long.parseLong(matcher.group(3)); status.getLap = Long.parseLong(matcher.group(4)); status.getPosition = Long.parseLong(matcher.group(5)); status.putLap = Long.parseLong(matcher.group(6)); status.putPosition = Long.parseLong(matcher.group(7)); status.unreadNumber = Long.parseLong(matcher.group(8)); return status; } return null; } /** * 将字符串加入队列 * * @param queueName * 队列名称 * @param str * 字符串 * @throws HttpsqsException */ public void putString(String queueName, String str) throws HttpsqsException { String urlStr = "opt=put&name=" + queueName; String source = this.httpPost(urlStr, str); if (source.contains("HTTPSQS_PUT_END")) { throw new HttpsqsException("Queue [" + queueName + "] fulled."); } else if (source.contains("HTTPSQS_PUT_ERROR")) { throw new HttpsqsException("Put data to queue [" + queueName + "] failed."); } } /** * 将字符串出队列 * * @param queueName * 队列名称 * @return * @throws HttpsqsException */ public String getString(String queueName) throws HttpsqsException { String urlStr = "opt=get&charset=" + Httpsqs4j.charset + "&name=" + queueName; String source = this.httpGet(urlStr); // if (source.contains("HTTPSQS_GET_END")) { // throw new HttpsqsException("There's no data in queue [" + queueName + // "]."); // } return source; } /** * 获取某位置的字符串 * * @param queueName * 队列名称 * @param pos * 位置 * @return * @throws HttpsqsException */ public String getStringAt(String queueName, long pos) throws HttpsqsException { if (pos < 1 || pos > 1000000000l) { throw new HttpsqsException("Pos' out of range[1 - 1000000000]."); } String urlStr = "opt=view&charset=" + Httpsqs4j.charset + "&name=" + queueName + "&pos=" + pos; return this.httpGet(urlStr); } /** * 重置队列 * * @param queueName * 队列名称 * @return * @throws HttpsqsException */ public boolean reset(String queueName) throws HttpsqsException { String urlStr = "opt=reset&name=" + queueName; String source = this.httpGet(urlStr); return source.contains("HTTPSQS_RESET_OK"); } /** * 设置最大队列数量 * * @param queueName * 队列名称 * @param number * 最大数量 * @return * @throws HttpsqsException */ public boolean setMaxNumber(String queueName, long number) throws HttpsqsException { if (pos < 10 || pos > 1000000000l) { throw new HttpsqsException("Pos' out of range[10 - 1000000000]."); } String urlStr = "opt=maxqueue&name=" + queueName + "&num=" + number; String source = this.httpGet(urlStr); return source.contains("HTTPSQS_MAXQUEUE_OK"); } }
状态标识类
public class HttpsqsStatus { /** * HttpSQS版本 */ public String version; /** * 队列名称 */ public String queueName; /** * 队列最大数量 */ public long maxNumber; /** * 当前入队位置 */ public long putPosition; /** * 当前入队圈数 */ public long putLap; /** * 当前出队位置 */ public long getPosition; /** * 当前出队圈数 */ public long getLap; /** * 当前未出队数量 */ public long unreadNumber; protected static Pattern pattern = Pattern .compile("HTTP Simple Queue Service v(.+?)\\s(?:.+?)\\sQueue Name: (.+?)\\sMaximum number of queues: (\\d+)\\sPut position of queue \\((\\d+)st lap\\): (\\d+)\\sGet position of queue \\((\\d+)st lap\\): (\\d+)\\sNumber of unread queue: (\\d+)"); }
自定义异常类
public class HttpsqsException extends Exception { private static final long serialVersionUID = 1L; public HttpsqsException() { super(); } public HttpsqsException(String message) { super(message); } public HttpsqsException(String message, Throwable cause) { super(message, cause); } }
相关文章推荐
- JSP学习经验总结(转)
- JIURL PE 格式学习总结(四)-- PE文件中的资源
- 前一段时间的学习总结
- 我的学习总结
- good,JSP学习经验总结
- 学习ejb并配置一个简单的helloEjb是遇到问题后总结的经验。
- 一些C++连接,以及个人的C++学习总结。
- VC实例学习 (1):总结下今天学习的东西
- 2001 Microsoft Tech Ed (Beijing 2001/9/7 – 2001/9/9)学习总结
- 在自己学习stuts是遇到问题后总结的经验。
- JIURL PE 格式学习总结(三)-- PE文件中的输入函数
- 结合本人学习,总结一些C#和JAVA的不同这处
- bash 学习总结
- 对前一段时间学习网络和多线程编程的总结
- JIURL PE 格式学习总结(二)-- PE文件中的输出函数
- XP方法学习总结及对小组开发的思考
- 蛙蛙推荐:蛙蛙学习asp.net总结(之一)
- JSP学习经验总结(转)
- VC学习资料收集(12):VC小知识总结
- oracle的学习小总结及其与sqlserver小区别