您的位置:首页 > 数据库 > Mongodb

MongoDB上MapReduce的实现以及项目实例探索

2018-01-12 15:35 543 查看

0x01 数据存储结构

前段时间发现公司原来的一个业务,使用mysql分月创建表记录用户登陆记录信息,在表数量达到千万级后,查询变得超级慢,并且时不时的卡死终端,为了解决这个问题,没有选择花大力气去优化,而是选择了使用MongoDB数据库去存储,记过几个月的数据存储,数据量早已达到五千万级别,但查询速度依然很快。

MongoDB存储了大量的用户登录记录,文档结构如下:

{
"_id" : ObjectId("5a1fde257756dca1c86f2d23"),
"_class" : "com.aaa.LoginHistory",
"uid" : "94989242",
"ip" : "192.168.1.219",
"imei" : "865736037366641",
"epid" : "33189023-9472-4f30-81cf-8c7d13132aae5",
"platform" : "Android",
"timestamp" : "1512037117100",
"address" : "中国,江苏省,南京市,建邺区"
}


0x02 初试Aggregate

现在有个需求,需要统计每一天(yyyy-MM-dd)的登录用户数(uid对应用户,每天存在多次登陆记录,即uid可能重复)。

由于技术菜,选择了比较low的方法实现:根据需要查询的时间间隔的时间,就是开始时间戳和结束时间戳,把它分成每天一个间隔,然后利用以下方法去统计:

db.loginHistory.aggregate([
{
$match:{
"timestamp":{"$lte":"1513412162000","$gte":"1512548162000"}
}

},
{
$group:{
_id:{"uid":"$uid"},
"count":{"$avg":1}
}
},
{
$group:{
_id:"统计",
"count":{"$sum":1}
}
}
])


可以得到某个时间间隔内的uid数量:

{ "_id" : "统计", "count" : 30003 }


[b]对应的java代码实现[/b]

开始时间

long startTime = startCalendar.getTimeInMillis();


结束时间

long endTime = endCalendar.getTimeInMillis();


获取集合对象

DBCollection dbCollection = mongoTemplate.getCollection("loginHistory");


match document 根据开始结束时间过滤

DBObject match = new BasicDBObject("$match",
new BasicDBObject("timestamp",new BasicDBObject("$gte",startTime+"").append("$lte",endTime+""));
);


group1 document 根据uid统计,每个uid的数量取平均值即当天登录次数去重

DBObject group1 = new BasicDBObject("$group",
new BasicDBObject("_id",new BasicDBObject("uid","$uid"))
.append("count",new BasicDBObject("$avg",1))
);


group2 document 把上一个group的结果统计,得到该时间段内的登录用户数

DBObject group2 = new BasicDBObject("$group",
new BasicDBObject("_id","$count")
.append("count",new BasicDBObject("$sum",1))
);


执行管道统计

AggregationOutput aggregationOutput = dbCollection.aggregate(match,group1,group2);


结果获取

for (DBObject obj : aggregationOutput.results()) {
try {
String json = obj.toString();
if (json.startsWith("{") && json.endsWith("}")) {
JSONObject jsonObject = new JSONObject(json);
long count = jsonObject.getLong("count");
LoginHistoryCountModel loginHistoryCount = new LoginHistoryCountModel(null,dateIn
4000
t,epid,
platform != null?platform.toLowerCase():platform,count);
loginHistoryCountList.add(loginHistoryCount);
}
} catch (JSONException e) {
e.printStackTrace();
}
}


但这样的查询有很大的问题:

1. 如果需要查询一年以内的话,则需分成365个时间区不断得循环调用,导致时间开销超级大。

最后经过各种百度找到了一种可能性的实现,通过MongoDB内置函数$dateToString实现时间戳格式化。

db.loginHistory.aggregate(
[
{
$match:{
timestamp:{
$lte:"1513399696240",
$gte:"1513389696240"
}
}
},
{
$project:{
uid:1,
timestamp:1,
_id:0
}
},
{
$group:{
_id:{
uid:"$uid",
date:{
$dateToString:{
format:"%Y-%m-%d",
date:{
$add:[new Date(0),parseInt("$timestamp")]
}
}
}
}
}
},
{
$group:{
_id:{
date:"$_id.date"
},
count:{"$sum":1}
}
}
]
)


但是问题又来了,执行报错:

assert: command failed: {
"ok" : 0,
"errmsg" : "$dateToString is only defined on year 0-9999, tried to use year 292278994",
"code" : 18537
} : aggregate failed
_getErrorWithCode@src/mongo/shell/utils.js:25:13
doassert@src/mongo/shell/assert.js:13:14
assert.commandWorked@src/mongo/shell/assert.js:287:5
DBCollection.prototype.aggregate@src/mongo/shell/collection.js:1312:5


百思不得其解,翻百度翻stackoverflow,找到了一个解释,在add中是不允许使用timestamp这样的变量的。

尝试性的把parseInt(“$timestamp”)改成parseInt(“1513399696240”),执行得到结果:

{ "_id" : { "date" : "2017-12-16" }, "count" : 19 }


这就验证了上面的说法,无法使用变量。

0x03 使用MapReduce

经过v2上的大哥提建议说用mapReduce,尝试着改成了MapReduce,MapReduce结构原理如下:(盗个图,哈哈)



query:筛选出我们需要的数据

map:把数据整理成key-value关联的数据emit

emit:把key对应的values组成一个数组

reduce:做最后的操作,自定义核心统记分析算法所在

实现map:

function(){
var date = new Date(parseInt(this.timestamp));
var dateKey = date.getFullYear()+"-"+(date.getMonth()+1)+"-"+date.getDate();
emit(dateKey,{uid:this.uid,count:1});
}


上述第一行
var date = new Date(parseInt(this.timestamp));
实现把我们存在数据库中的字符串时间戳转化为64位int,且新建一个该时间戳的Date对象。

第二行
var dateKey = date.getFullYear()+"-"+(date.getMonth()+1)+"-"+date.getDate();
实质上是取出该时间的年、月、日,如:yyyy-MM-dd。

最后执行emit函数发射出去。

实现reduce:

function(key, values){
var uids = {};
for (var i = 0;i < values.length; i++) {
uids[values[i]] = 1;
}
var result = 0;
for (var i in uids) {
result++;
}
return {count:result};
}


参数values是经过emit处理得到的key对应的value的数组集。

函数前四行实现了当天uid的去重,进而实现当天同用户多条登陆记录只算1。

后五行实现去重后的统计,得到当天的用户登陆数。

query:

query:{
timestamp:{"$gte":"1512894400000","$lte":"1513180799999"},
epid:"cc74d30d-fdce-4aea-8fdb-b23198706383"},
out:"haha"
}


查询时间timestamp大于等于1512894400000 and 小于等于1513180799999 and epid等于cc74d30d-fdce-4aea-8fdb-b23198706383 的所有数据传入到map,reduce结果输出到haha集合。

db.loginHistory.mapReduce(
function(){
var date = new Date(parseInt(this.timestamp));
var key = date.getFullYear()+"-"+(date.getMonth()+1)+"-"+date.getDate();
var value = this.uid;
emit(key,value);
},
function(key, values){ var uids = {}; for (var i = 0;i < values.length; i++) { uids[values[i]] = 1; } var result = 0; for (var i in uids) { result++; } return {count:result}; },
{
query:{ timestamp:{"$gte":"1512894400000","$lte":"1513180799999"}, epid:"cc74d30d-fdce-4aea-8fdb-b23198706383"}, out:"haha" }
}
).find()


最后实现了快速统计的MapReduce,也入门了大数据。。。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐