您的位置:首页 > Web前端 > Node.js

我的node之kafka实践

2017-02-14 10:14 323 查看
http://m.blog.csdn.net/article/details?id=51281427

转自

最近公司基于微信做了一个微信发红包的工具,此工具需要对接授权给微信公众号,然后通过关键词实现发红包

一、整体架构

系统主要构成模块如下:

一、微信转发Middle服务模块(负责接受用户关键词等请求转发到对应的前端服务器)

1)、微信转发server首先查询当前用户是否已经有过一次域名授权转发,若从在则直接获取之前的域名进行转发

二、前端+微信授权模块服务(可同时支持多个微信公众号授权,一个用户只能授权一种微信公众号token+openid)

1)、通过微信转发server回复给用户对应公众号的图文消息带有授权公众号的域名

2)、授权成功保存用户信息,同时保存openid在session中

3)、同一个用户只能抢一次,一旦抢过,则在session中记录用户抢记录,避免同一用户多次点击抢红包按钮,减少后端请求

4)、对于参与过抽奖的用户,二次之后的请求结果页面直接缓存,减少不必要的请求

三、红包模块(不同公众号对应创建一个红包队列)

1)、同一个公众号只能创建一个红包队列

2)、红包可进行须发

四、发放红包模块

1)、通过对应的用户授权公众号token区分不同微信支付

五、通用授权公众号hash配置

1)、通过授权域名做为键,对应的值则为公众号的基本信息、支付配置信息、授权转发域名等

六、消息日志记录采用kafka+zookeeper

单台服务器最高同时在线人数达到5000的并发写,发红包的记录通过脚本每五分钟从kafka中拉去消费数据,保存到数据库,做为后期的数据分析

以下为node发送消息和读取消息例子:

producer:

var kafka = require(‘kafka-node’);

var Producer = kafka.Producer;

var KeyedMessage = kafka.KeyedMessage;

var Client = kafka.Client;

var client = new Client(‘localhost:2181’);

var argv = {

topic: “topic1”

};

var topic = argv.topic || ‘topic10’;

var p = argv.p || 0;

var a = argv.a || 0;

var producer = new Producer(client, {

requireAcks: 1

});

producer.on(‘ready’, function() {

var args = {

appid: ‘wx238c28839a133d0e’,

createTime: ‘ddd’,

toUserName: ‘wx238c28839a133d0e’,

fromUserName: ‘wx238c28839a133d0e’

};

// var keyedMessage = new KeyedMessage(‘keyed’, ‘a keyed message’);

producer.send([{

topic: topic,

partition: p,

messages: [JSON.stringify(args)],

attributes: a

}], function(err, result) {

console.log(err || result);

process.exit();

});

//create topics
// producer.createTopics(['t1'], function (err, data) {
//     console.log(data);
// });


});

consumer:

‘use strict’;

var kafka = require(‘kafka-node’);

var events = require(‘events’);

var emitter = new events.EventEmitter();

//var HighLevelConsumer = kafka.HighLevelConsumer;

var Consumer = kafka.Consumer;

var Offset = kafka.Offset;

var Client = kafka.Client;

var argv = {

topic: “hongbao_xxx”

};

var topic = argv.topic || ‘topic1’;

var client = new Client(‘localhost:2181’);

var topics = [{

topic: topic,

partition: 0,

offset: 8000

}],

options = {

autoCommit: false,

fetchMaxWaitMs: 1000,

fetchMaxBytes: 1024 * 1024,

fromOffset: true

};

var consumer = new Consumer(client, topics, options);

var offset = new Offset(client);

//consumer.setOffset(topic, 0, 36);

consumer.on(‘message’, function(message) {

var obj = message;

var message = JSON.parse(message.value);

var args = [];

args.push(message.openId);

args.push(message.fromUserName);

args.push(message.toUserName);

args.push(message.money);

args.push(message.attach);

args.push(message.appId);

args.push(message.cTime);

emitter.emit(‘load’, args);

});

consumer.on(‘error’, function(err) {

console.log(‘error’, err);

});

emitter.on(‘load’, function(args) {

console.log(‘listener2’, args);

// function insert(args) {
//     middleconsumer.saveRecord(args)
//         .then(function(data) {
//             insert(args);
//         }, function(er) {

//         });
// }
// insert(args);


});

/*

* If consumer get
offsetOutOfRange
event, fetch data from the smallest(oldest) offset

*/

consumer.on(‘offsetOutOfRange’, function(topic) {

topic.maxNum = 2;

offset.fetch([topic], function(err, offsets) {

var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);

consumer.setOffset(topic.topic, topic.partition, min);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: