Elasticsearch与MongoDB 数据同步及分布式集群搭建(一)
2015-08-13 10:14
776 查看
Elasticsearch通过River可以与多种数据源Wikipedia, MongoDB, CouchDB, RabbitMQ, RSS, Sofa, JDBC, FileSystem,Dropbox等同步,公司的业务是用 MongoDB,今天测试环境虚拟机上配置了一下Elasticsearch
与 MongoDB的同步,作个大概的过程记录,主要利用richardwilly98 / elasticsearch-river-mongodb。
River通过读取mongodb的oplog来同步数据,oplog这个表来使集群中的不同机器数据同步的,可以保证es里面的数据和mongodb里面的是一样的,所以Mongdb必须是个集群才能拥有oplog.注意:该插件只支持集群环境下的mongodb,因为集群环境下的mongodb才有oplog这个
Elasticsearch 和 MongoDB需要安装对应的版本才能实现同步,我这里用了最新的Elasticsearch 1.4.2 和 MongoDB 3.0.0,相应的版本要求参考下表
elasticsearch-mongodb
MongDB是一个副本集的集群,具体副本集集群的搭建不详细写了,Elasticsearch的安装配置也省略。
1.安装elasticsearch-river-mongodb
# ./elasticsearch-1.4.4/bin/plugin -install elasticsearch/elasticsearch-mapper-attachments/2.4.1
# ./elasticsearch-1.4.4/bin/plugin -i com.github.richardwilly98.elasticsearch/elasticsearch-river-mongodb/2.0.5
2.建立river
curl -XPUT "http://10.253.1.70:9200/_river/threads_mongo_river/_meta"</span> -d'
{
"type": "mongodb",
"mongodb": {
"servers":
[
{ "host": "10.253.1.71", "port": 27017}
],
"db": "threads",
"collection": "threads",
"gridfs": false
},
"index": {
"name": "test",
"type": "threads"
}
}'
这里只是简单的配置了建立连接的MongoDB以及相应的db,collection对于的elasticsearch index和type,还有详细的配置没有使用到,比如options等,具体根据业务可以配置,下面是一份详细的配置样例:
$ curl -XPUT "localhost:9200/_river/${es.river.name}/_meta" -d '
{
"type": "mongodb",
"mongodb": {
"servers":
[
{ "host": ${mongo.instance1.host}, "port": ${mongo.instance1.port} },
{ "host": ${mongo.instance2.host}, "port": ${mongo.instance2.port} }
],
"options": {
"secondary_read_preference" : true,
"drop_collection": ${mongo.drop.collection},
"exclude_fields": ${mongo.exclude.fields},
"include_fields": ${mongo.include.fields},
"include_collection": ${mongo.include.collection},
"import_all_collections": ${mongo.import.all.collections},
"initial_timestamp": {
"script_type": ${mongo.initial.timestamp.script.type},
"script": ${mongo.initial.timestamp.script}
},
"skip_initial_import" : ${mongo.skip.initial.import},
"store_statistics" : ${mongo.store.statistics},
},
"credentials":
[
{ "db": "local", "user": ${mongo.local.user}, "password": ${mongo.local.password} },
{ "db": "admin", "user": ${mongo.db.user}, "password": ${mongo.db.password} }
],
"db": ${mongo.db.name},
"collection": ${mongo.collection.name},
"gridfs": ${mongo.is.gridfs.collection},
"filter": ${mongo.filter}
},
"index": {
"name": ${es.index.name},
"throttle_size": ${es.throttle.size},
"bulk_size": ${es.bulk.size},
"type": ${es.type.name}
"bulk": {
"actions": ${es.bulk.actions},
"size": ${es.bulk.size},
"concurrent_requests": ${es.bulk.concurrent.requests},
"flush_interval": ${es.bulk.flush.interval}
}
}
}'
一些配置项的解释如下,具体可以查看github的wiki:
db为同步的数据库名,
host mongodb的ip地址(默认为localhost)
port mongodb的端口
collection 要同步的表名
fields 要同步的字段名(用逗号隔开,默认全部)
gridfs 是否是gridfs文件(如果collection是gridfs的话就设置成true)
local_db_user local数据库的用户名(没有的话不用写)
local_db_password local数据库的密码(没有的话不用写)
db_user 要同步的数据库的密码(没有的话不用写)
db_password 要同步的数据库的密码(没有的话不用写)
name 索引名(不能之前存在)
type 类型
bulk_size 批量添加的最大数
bulk_timeout 批量添加的超时时间
3.测试是否成功
我测试的库中数据较少,所以就直接都查出来看看是否能查出来吧
$ curl -XGET "http://10.253.1.70:9200/test/threads/_search" -d '
{
"took": 20,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 4,
"max_score": 1,
"hits": [
{
"_index": "test",
"_type": "threads",
"_id": "54fa32b22c44cf67cb6a9d1b",
"_score": 1,
"_source": {
"_id": "54fa32b22c44cf67cb6a9d1b",
"title": "where is my car",
"content": "ask yourself"
}
},
{
"_index": "test",
"_type": "threads",
"_id": "54fa2f5c2c44cf67cb6a9d19",
"_score": 1,
"_source": {
"_id": "54fa2f5c2c44cf67cb6a9d19",
"title": "this is title",
"content": "what is the fuck"
}
},
{
"_index": "test",
"_type": "threads",
"_id": "54fa2f892c44cf67cb6a9d1a",
"_score": 1,
"_source": {
"_id": "54fa2f892c44cf67cb6a9d1a",
"title": "are you ok",
"content": "yes,i am ok"
}
},
{
"_index": "test",
"_type": "threads",
"_id": "54fa49ccc104e2264e02deea",
"_score": 1,
"_source": {
"_id": "54fa49ccc104e2264e02deea",
"title": "hello word",
"content": "hello hello haha"
}
}
]
}
}
看来数据已经同步过来了,然后在MongoDB 添加一条记录,执行同样的操作查找对于的记录或者 total已经+1那么同步完成了。
与 MongoDB的同步,作个大概的过程记录,主要利用richardwilly98 / elasticsearch-river-mongodb。
River通过读取mongodb的oplog来同步数据,oplog这个表来使集群中的不同机器数据同步的,可以保证es里面的数据和mongodb里面的是一样的,所以Mongdb必须是个集群才能拥有oplog.注意:该插件只支持集群环境下的mongodb,因为集群环境下的mongodb才有oplog这个
Elasticsearch 和 MongoDB需要安装对应的版本才能实现同步,我这里用了最新的Elasticsearch 1.4.2 和 MongoDB 3.0.0,相应的版本要求参考下表
elasticsearch-mongodb
MongDB是一个副本集的集群,具体副本集集群的搭建不详细写了,Elasticsearch的安装配置也省略。
1.安装elasticsearch-river-mongodb
# ./elasticsearch-1.4.4/bin/plugin -install elasticsearch/elasticsearch-mapper-attachments/2.4.1
# ./elasticsearch-1.4.4/bin/plugin -i com.github.richardwilly98.elasticsearch/elasticsearch-river-mongodb/2.0.5
2.建立river
curl -XPUT "http://10.253.1.70:9200/_river/threads_mongo_river/_meta"</span> -d'
{
"type": "mongodb",
"mongodb": {
"servers":
[
{ "host": "10.253.1.71", "port": 27017}
],
"db": "threads",
"collection": "threads",
"gridfs": false
},
"index": {
"name": "test",
"type": "threads"
}
}'
这里只是简单的配置了建立连接的MongoDB以及相应的db,collection对于的elasticsearch index和type,还有详细的配置没有使用到,比如options等,具体根据业务可以配置,下面是一份详细的配置样例:
$ curl -XPUT "localhost:9200/_river/${es.river.name}/_meta" -d '
{
"type": "mongodb",
"mongodb": {
"servers":
[
{ "host": ${mongo.instance1.host}, "port": ${mongo.instance1.port} },
{ "host": ${mongo.instance2.host}, "port": ${mongo.instance2.port} }
],
"options": {
"secondary_read_preference" : true,
"drop_collection": ${mongo.drop.collection},
"exclude_fields": ${mongo.exclude.fields},
"include_fields": ${mongo.include.fields},
"include_collection": ${mongo.include.collection},
"import_all_collections": ${mongo.import.all.collections},
"initial_timestamp": {
"script_type": ${mongo.initial.timestamp.script.type},
"script": ${mongo.initial.timestamp.script}
},
"skip_initial_import" : ${mongo.skip.initial.import},
"store_statistics" : ${mongo.store.statistics},
},
"credentials":
[
{ "db": "local", "user": ${mongo.local.user}, "password": ${mongo.local.password} },
{ "db": "admin", "user": ${mongo.db.user}, "password": ${mongo.db.password} }
],
"db": ${mongo.db.name},
"collection": ${mongo.collection.name},
"gridfs": ${mongo.is.gridfs.collection},
"filter": ${mongo.filter}
},
"index": {
"name": ${es.index.name},
"throttle_size": ${es.throttle.size},
"bulk_size": ${es.bulk.size},
"type": ${es.type.name}
"bulk": {
"actions": ${es.bulk.actions},
"size": ${es.bulk.size},
"concurrent_requests": ${es.bulk.concurrent.requests},
"flush_interval": ${es.bulk.flush.interval}
}
}
}'
一些配置项的解释如下,具体可以查看github的wiki:
db为同步的数据库名,
host mongodb的ip地址(默认为localhost)
port mongodb的端口
collection 要同步的表名
fields 要同步的字段名(用逗号隔开,默认全部)
gridfs 是否是gridfs文件(如果collection是gridfs的话就设置成true)
local_db_user local数据库的用户名(没有的话不用写)
local_db_password local数据库的密码(没有的话不用写)
db_user 要同步的数据库的密码(没有的话不用写)
db_password 要同步的数据库的密码(没有的话不用写)
name 索引名(不能之前存在)
type 类型
bulk_size 批量添加的最大数
bulk_timeout 批量添加的超时时间
3.测试是否成功
我测试的库中数据较少,所以就直接都查出来看看是否能查出来吧
$ curl -XGET "http://10.253.1.70:9200/test/threads/_search" -d '
{
"took": 20,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 4,
"max_score": 1,
"hits": [
{
"_index": "test",
"_type": "threads",
"_id": "54fa32b22c44cf67cb6a9d1b",
"_score": 1,
"_source": {
"_id": "54fa32b22c44cf67cb6a9d1b",
"title": "where is my car",
"content": "ask yourself"
}
},
{
"_index": "test",
"_type": "threads",
"_id": "54fa2f5c2c44cf67cb6a9d19",
"_score": 1,
"_source": {
"_id": "54fa2f5c2c44cf67cb6a9d19",
"title": "this is title",
"content": "what is the fuck"
}
},
{
"_index": "test",
"_type": "threads",
"_id": "54fa2f892c44cf67cb6a9d1a",
"_score": 1,
"_source": {
"_id": "54fa2f892c44cf67cb6a9d1a",
"title": "are you ok",
"content": "yes,i am ok"
}
},
{
"_index": "test",
"_type": "threads",
"_id": "54fa49ccc104e2264e02deea",
"_score": 1,
"_source": {
"_id": "54fa49ccc104e2264e02deea",
"title": "hello word",
"content": "hello hello haha"
}
}
]
}
}
看来数据已经同步过来了,然后在MongoDB 添加一条记录,执行同样的操作查找对于的记录或者 total已经+1那么同步完成了。
相关文章推荐
- 分享微信开发Html5轻游戏中的几个坑
- PHP添加yaf xhprof mongodb 同理
- mongodb安装
- 如何在 Ubuntu 上安装 MongoDB
- perl操作MongoDB报错undefined symbol: HeUTF8解决方法
- C#中使用1.7版本驱动操作MongoDB简单例子
- MongoDB系列教程(四):设置用户访问权限
- 巧用mysql提示符prompt清晰管理数据库的方法
- php实现的mongodb操作类实例
- 解决mongodb在ubuntu下启动失败,提示couldn‘t remove fs lock errno:9 Bad file descriptor的错误
- 在PostgreSQL的基础上创建一个MongoDB的副本的教程
- java操作mongodb示例分享
- php对mongodb的扩展(初出茅庐)
- 作为PHP程序员应该了解MongoDB的五件事
- 基于MySQL到MongoDB简易对照表的详解
- 两大步骤教您开启MySQL 数据库远程登陆帐号的方法
- MongoDB入门教程之C#驱动操作实例
- MongoDB为用户设置访问权限
- MongoDB db.serverStatus()输出内容中文注释
- MongoDB的一些常用查询方法