您的位置:首页 > 数据库 > Mongodb

Mongodb总结5-通过装饰模式,用Mongodb解决Hbase的不稳定问题

2015-10-14 16:02 573 查看
最近继续学习Mongodb的根本原因,是为了解决今天的问题。
项目中用到了Hbase,生产环境服务器用了3台,但是不够稳定,每2天左右,就连不上了。
重启就好了,当然,这是一个历史遗留问题。
我在想,是不是连接没有关闭,每次都是建立新的连接?
瞅瞅Java访问Hbase的代码,都close了额。

原来的Hbase,用Java访问,有add/update、get、getList3个接口。
现在要加上Mongodb存储,尽可能保证Hbase和Mongodb数据同步。
优先使用Mongodb中的数据,其次才使用HBase中的数据。

今后有可能不再使用Hbase。

在项目刚刚启动的时候,需要同步HBase中的数据到Mongodb。
简化代码如下
public class ProjectDetailClient {
private ProjectDetailHbaseClient hbase = new ProjectDetailHbaseClient();
private ProjectDetailMongodbClient mongodb = new ProjectDetailMongodbClient();

// 2个都增加
public void add(ProjectDetail projectDetail) {
}


可以这么理解,原来直接使用ProjectDetailHbaseClient,方法名称都一样。
后台增加了ProjectDetailMongodbClient,方法的实现也一样,可以看作是一套接口的2套实现。
ProjectDetailClient的add等具体方法中,会处理2个接口的调用、数据同步等逻辑问题。

完整代码如下

package com.hanhai.zrb.api.mongodb;

import java.util.List;

import org.apache.log4j.Logger;

import casia.isiteam.zrb.hbase.client.ProjectDetailHbaseClient;

import com.hanhai.zrb.model.project.ProjectDetail;

public class ProjectDetailClient {
private ProjectDetailHbaseClient hbase = new ProjectDetailHbaseClient();
private ProjectDetailMongodbClient mongodb = new ProjectDetailMongodbClient();
private Logger log = Logger.getLogger(getClass());

// 2个都增加
public void add(ProjectDetail projectDetail) {
log.info("Add ProjectDetail for hbase.");
hbase.insertProjectDetail(projectDetail);
log.info("Add ProjectDetail for mongodb.");
mongodb.add(projectDetail);
}

// 2个都更新
public void update(ProjectDetail projectDetail) {
if (projectDetail.getId() == null) {
log.error("ProjectDetail is is null,Cantnot update~");
return;
}
Long id = projectDetail.getId();

ProjectDetail one = mongodb.get(id);
// Mongodb,如果存在,更新
if (one != null) {
log.info("Update ProjectDetail,Mongodb exists,id="+id);
mongodb.update(projectDetail);
}
// 不存在,就增加
else {
log.info("Update ProjectDetail,Mongodb not exists,id="+id);
mongodb.add(projectDetail);
}

// hbase增加和更新是同一个接口
log.info("Update ProjectDetail for hbase,id="+id);
hbase.insertProjectDetail(projectDetail);
}

// 2个都查询,优先使用Mongodb
public ProjectDetail get(long id) {
ProjectDetail one = null;
ProjectDetail hbaseOne = hbase.getProjectDetail(id);
ProjectDetail mongodbOne = mongodb.get(id);
if (mongodbOne != null) {
one = mongodbOne;
log.info("Project Detail,Mongodb exists,Use Mongodb," + one);
} else if (hbaseOne != null) {
one = hbaseOne;
log.info("Project Detail,Mongodb not exists,Use Hbase," + one);
log.info("Add Project Detail To Mongodb");
// 同步Hbase中的数据到Mongodb
mongodb.add(hbaseOne);
}
return one;
}

// 2个都查询,优先使用Mongodb
public List<ProjectDetail> getProjectInfoBasic(List<Long> idList) {
List<ProjectDetail> list = null;
List<ProjectDetail> hbaseList = hbase.getProjectInfoBasic(idList);
List<ProjectDetail> mongodbList = mongodb.getProjectInfoBasic(idList);
// 优先使用Mongodb中的,条件,Mongodb中的个数不小于hbase中的
if (mongodbList != null) {
int size = mongodbList.size();
if (hbaseList == null || hbaseList.size() <= size) {
list = mongodbList;
log.info("ProjectDetail list,Use MongodbList,size=" + size);
}else{
list = hbaseList;
log.info("ProjectDetail list,Use HbaseList,size=" + hbaseList.size()+",mongodb count "+size+" < hbase count "+hbaseList.size());
}
}
// 其次使用Hbase中的,不会同步hbase中的数据到Mongodb
else if (hbaseList != null) {
list = hbaseList;
log.info("ProjectDetail list,Use HbaseList,size=" + hbaseList.size());
}
return list;
}
}

package com.hanhai.zrb.api.mongodb;

import java.util.ArrayList;
import java.util.List;

import org.apache.log4j.Logger;

import com.hanhai.zrb.model.project.ProjectDetail;
import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.CommandResult;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.WriteResult;

public class ProjectDetailMongodbClient {

public static final String CON_NAME = "projectDetail";

private Logger log = Logger.getLogger(getClass());

public void add(ProjectDetail projectDetail) {
DBCollection con = getCon();
add(con, projectDetail);
}

private DBCollection getCon() {
DB db = MongoUtil.db();
DBCollection con = db.getCollection(CON_NAME);
return con;
}

// 增加
private DBCollection add(DBCollection projectDetailCollection,
ProjectDetail projectDetail) {
DBObject object = BeanUtil.bean2DBObject(projectDetail);
WriteResult wr = projectDetailCollection.insert(object);
CommandResult cr = wr.getLastError();
log.info("Add new projectDetail,result:" + cr);
return projectDetailCollection;
}

public void update(ProjectDetail projectDetail) {
DBCollection con = getCon();
update(con, projectDetail);
}

// 修改
private void update(DBCollection collection, ProjectDetail projectDetail) {
if (projectDetail.getId() == null) {
log.error("Update projectDetail,must have a unique id");
return;
}

BasicDBObject updateCondition = new BasicDBObject();
updateCondition.append("id", projectDetail.getId());

DBObject newObject = BeanUtil.bean2DBObject(projectDetail);

DBObject updateSetValue = new BasicDBObject("$set", newObject);
WriteResult wr = collection.update(updateCondition, updateSetValue);
log.info("Update new projectDetail,result:" + wr);
}

public ProjectDetail get(long id) {
DBCollection con = getCon();
ProjectDetail projectDetail = findById(con, id);
return projectDetail;
}

// 从集合中,根据ID查找
private ProjectDetail findById(DBCollection collection, Long id) {
BasicDBObject searchProjectDetailById = new BasicDBObject();
searchProjectDetailById.append("id", id);
ProjectDetail projectDetailBefore = null;
// findOne方法更简单一些
DBCursor cursor = collection.find(searchProjectDetailById);
while (cursor.hasNext()) {
DBObject articleObject = cursor.next();
if (articleObject != null) {
projectDetailBefore = objectToArticle(articleObject);
String internalId = articleObject.get("_id").toString();
projectDetailBefore.setMongoId(internalId);
}
}
cursor.close();
return projectDetailBefore;
}

// 对象转换
private ProjectDetail objectToArticle(DBObject object) {
ProjectDetail projectDetail = new ProjectDetail();
// 用工具方法转换,手动转换,需要判断类型,比较麻烦
projectDetail = BeanUtil.dbObject2Bean(object, projectDetail);
return projectDetail;
}

public List<ProjectDetail> getProjectInfoBasic(List<Long> idList) {
DBCollection con = getCon();
List<ProjectDetail> list = findByIdList(con, idList);
return list;
}

// 根据ID集合查找
private List<ProjectDetail> findByIdList(DBCollection collection,
List<Long> idList) {
BasicDBList values = new BasicDBList();
values.addAll(idList);

DBObject inQuery = new BasicDBObject("$in", values);

DBObject con = new BasicDBObject();
con.put("id", inQuery);
DBCursor cursorIdArray = collection.find(con);

List<ProjectDetail> projectDetailList = new ArrayList<ProjectDetail>();
while (cursorIdArray.hasNext()) {
DBObject articleObject = cursorIdArray.next();
ProjectDetail projectDetail = new ProjectDetail();
BeanUtil.dbObject2Bean(articleObject, projectDetail);
String mongoId = articleObject.get("_id").toString();
projectDetail.setMongoId(mongoId);

projectDetailList.add(projectDetail);
}
return projectDetailList;
}

}


ProjectDetailHbaseClient代码较为复杂,和ProjectDetailMongodbClient类似,不再贴了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: