您的位置:首页 > 数据库 > Mongodb

我的架构演化笔记 13:自己动手写Nutch-index插件导入数据到MongoDB

2014-07-18 00:00 495 查看
需求是导入数据到MongoDB.

1 创建目录

在$Nutch_home/src/plugin目录中
创建文件夹mkdir indexer-mongodb


2 创建文件

cd indexer-mongodb
touch build.xml
touch ivy.xml
touch plugin.xml
mkdir -p src/java/org/apache/nutch/indexwriter/mongodb
cd  src/java/org/apache/nutch/indexwriter/mongodb
touch MongoDBDataWriter.java
touch MongoDBConstants.java
至此,所有相关的文件都准备好了。


3 准备build.xml

<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at
 http://www.apache.org/licenses/LICENSE-2.0 
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project name="indexer-mongodb" default="jar-core">

<import file="../build-plugin.xml" />

</project>


4 准备ivy.xml

<?xml version="1.0" ?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at
 http://www.apache.org/licenses/LICENSE-2.0 
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<ivy-module version="1.0">
<info organisation="org.apache.nutch" module="${ant.project.name}">
<license name="Apache 2.0"/>
<ivyauthor name="Apache Nutch Team" url="http://nutch.apache.org"/>
<description>
Apache Nutch
</description>
</info>

<configurations>
<include file="../../..//ivy/ivy-configurations.xml"/>
</configurations>

<publications>
<!--get the artifact from our module name-->
<artifact conf="master"/>
</publications>

<dependencies>
<dependency org="org.mongodb" name="mongo-java-driver" rev="2.12.2" conf="*->default"/>
</dependencies>

</ivy-module>

注意,这里写上你需要的驱动包,对于MongoDB来说,就是你的MongoDB数据库对应的版本的驱动包。


5 准备plugin.xml

<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at
 http://www.apache.org/licenses/LICENSE-2.0 
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<plugin id="indexer-mongodb" name="MongoDBDataWriter" version="1.0.0"
provider-name="nutch.apache.org">

<runtime>
<library name="indexer-mongodb.jar">
<export name="*" />
</library>

<library name="mongo-java-driver-2.12.2.jar"/>
</runtime>

<requires>
<import plugin="nutch-extensionpoints" />
</requires>

<extension id="org.apache.nutch.indexer.mongodb"
name="MongoDB Data Writer"
point="org.apache.nutch.indexer.IndexWriter">
<implementation id="MongoDBDataWriter"
class="org.apache.nutch.indexwriter.mongodb.MongoDBDataWriter" />
</extension>

</plugin>


6 编写MongoDBDataWriter.java

package org.apache.nutch.indexwriter.mongodb;

import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import org.apache.nutch.indexer.IndexWriter;
import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.indexwriter.mongodb.*;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.mapred.JobConf;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;
import com.mongodb.WriteConcern;
import com.mongodb.WriteResult;

public class MongoDBDataWriter implements IndexWriter {
public static Logger LOG = LoggerFactory.getLogger(MongoDBDataWriter.class);

private int maxInsertDocs = 20;
private int maxDeleteDocs = 20;

private static int ACTION_INSERT = 1;
private static int ACTION_DELETE = 2;
private static int ACTION_ALL = 3;
private int action;

private Configuration config;
private String host_port;
private String database;
private String collection;

private MongoClient mongoClient;

private ArrayList<BasicDBObject> toBeUpsertedDocs;
private ArrayList<String> toBeDeletedDocs;

public MongoDBDataWriter() {
LOG.info("new MongoDBDataWriter() invoked---");
}

private void setup() {
MongoClient client = null;
try {
List list = new ArrayList();
String[] array = this.host_port.split(",");
int index = 0;
for (index = 0; index < array.length; index++) {
String str = array[index];
String[] ip_port = str.split(":");
list.add(new ServerAddress(ip_port[0], Integer
.parseInt(ip_port[1])));
}
if (list.size() > 0) {
client = new MongoClient(list);
}

} catch (Exception e) {
LOG.info(e.toString());
client = null;
}
mongoClient = client;
}

@Override
public void open(JobConf job, String name) {

toBeUpsertedDocs = new ArrayList<BasicDBObject>();
toBeDeletedDocs = new ArrayList<String>();
LOG.info(this.describe());
this.setup();
if (null != this.mongoClient) {
LOG.info("open for MongoClient succeed...");
} else {
LOG.info("open for MongoClient fail...");
}

}

@Override
public void write(NutchDocument doc) {
LOG.info(this.toString() + "write *** invoked");
BasicDBObject bo = new BasicDBObject();
Collection<String> fields = doc.getFieldNames();
for (String key : fields) {
//just use the first value...
String strValue = doc.getField(key).getValues().get(0).toString();
try{
int intValue = Integer.parseInt(strValue);
bo.put(key, intValue);
}catch(Exception e){
bo.put(key, strValue);
}
}
this.toBeUpsertedDocs.add(bo);
if (this.toBeUpsertedDocs.size() >= this.maxInsertDocs) {
action = ACTION_INSERT;
this.commit();
}

}

@Override
public void update(NutchDocument doc) {
LOG.info(this.toString() + "update *** invoked");
// won't be called In Nutch 1.7
write(doc);
}

@Override
public void delete(String key) {
LOG.info(this.toString() + "delete *** invoked");
this.toBeDeletedDocs.add(key);
if (this.toBeDeletedDocs.size() >= this.maxDeleteDocs) {
action = ACTION_DELETE;
this.commit();
}

}

@Override
public void commit() {
if (null != this.mongoClient) {
DB db = this.mongoClient.getDB(this.database);
if (null != db) {
DBCollection dbCollection = db.getCollection(this.collection);
if (null != dbCollection) {
if (0 != (action & ACTION_INSERT)
&& this.toBeUpsertedDocs.size() > 0) {
for( BasicDBObject obj:this.toBeUpsertedDocs){
dbCollection.save(obj, WriteConcern.SAFE);
}
//WriteResult result = dbCollection.insert(
// this.toBeUpsertedDocs, WriteConcern.SAFE);
this.toBeUpsertedDocs.clear();
}
if (0 != (action & ACTION_DELETE)
&& this.toBeDeletedDocs.size() > 0) {
BasicDBObject in = new BasicDBObject();
in.put("$in", this.toBeDeletedDocs);
BasicDBObject remove = new BasicDBObject();
remove.put("_id", in);
dbCollection.remove(remove, WriteConcern.SAFE);
this.toBeDeletedDocs.clear();
}

}
}
}

LOG.info(this.toString() + "commit *** invoked");
}

@Override
public void close() {
action = ACTION_ALL;
this.commit();

if (null != mongoClient) {
mongoClient.close();
mongoClient = null;
}

LOG.info("close the mongoclient ok!!!");
}

protected void finalize( )
{
// finalization code here
this.close();
}

@Override
public String describe() {
LOG.info(this.toString() + "describe *** invoked");
StringBuffer sb = new StringBuffer("MongoDB Data Writer\n");
sb.append("\t").append(MongoDBConstants.HOST_PORT).append(" : ")
.append(this.host_port);
sb.append("\t").append(MongoDBConstants.DATABASE).append(" : ")
.append(this.database);
sb.append("\t").append(MongoDBConstants.COLLECTION).append(" : ")
.append(this.collection);
sb.append("\t").append(MongoDBConstants.MAX_INSERT_DOCS).append(" : ")
.append(this.maxInsertDocs);
sb.append("\t").append(MongoDBConstants.MAX_DELETE_DOCS).append(" : ")
.append(this.maxDeleteDocs);
return sb.toString();
}

@Override
public void setConf(Configuration conf) {
LOG.info(this.toString() + "setconf *** invoked");
config = conf;
host_port = config.get(MongoDBConstants.HOST_PORT);
database = config.get(MongoDBConstants.DATABASE);
collection = config.get(MongoDBConstants.COLLECTION);
maxInsertDocs=Integer.parseInt(config.get(MongoDBConstants.MAX_INSERT_DOCS));
maxDeleteDocs=Integer.parseInt(config.get(MongoDBConstants.MAX_DELETE_DOCS));
}

@Override
public Configuration getConf() {
LOG.info(this.toString() + "getconf *** invoked");
return config;
}
}


7 编写MongoDBConstants.java

package org.apache.nutch.indexwriter.mongodb;

public class MongoDBConstants {
public static final String MONGODB_PREFIX = "mongo.";

public static final String HOST_PORT = MONGODB_PREFIX + "host_port";
public static final String DATABASE = MONGODB_PREFIX + "database";
public static final String COLLECTION = MONGODB_PREFIX+"collection";
public static final String MAX_INSERT_DOCS = MONGODB_PREFIX+"max.insert.docs";
public static final String MAX_DELETE_DOCS = MONGODB_PREFIX+"max.delete.docs";

}

8 修改src/plugin/build.xml

找到
<!-- ====================================================== -->
<!-- Build & deploy all the plugin jars.                    -->
<!-- ====================================================== -->
在下面添加一行

<ant dir="indexer-mongodb" target="deploy"/>


9 编译插件

ant


10 启用indexer-mongodb插件

修改nutch-site.xml内的plugin.includes
将其中indexer-solr替换成indexer-mongodb

添加下列参数到conf/nutch-site.xml
<property>
<name>mongo.host_port</name>
<value>
192.168.56.254:27017,192.168.56.66:27018
</value>
</property>
<property><name>mongo.database</name><value>nutch</value></property>
<property><name>mongo.collection</name><value>doc</value></property>
<property><name>mongo.max.insert.docs</name><value>30</value></property>
<property><name>mongo.max.delete.docs</name><value>30</value></property>


11执行索引命令

./bin/nutch index
./data/crawldb/
-linkdb ./data/linkdb/
-dir ./data/segments/
-deleteGone -filter -normalize  -noCommit


12附加说明:索引插件的流程



13效果图
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  架构 Nutch MongoDB 插件