我的架构演化笔记 13:自己动手写Nutch-index插件导入数据到MongoDB
2014-07-18 00:00
495 查看
需求是导入数据到MongoDB.
8 修改src/plugin/build.xml
13效果图
1 创建目录
在$Nutch_home/src/plugin目录中 创建文件夹mkdir indexer-mongodb
2 创建文件
cd indexer-mongodb touch build.xml touch ivy.xml touch plugin.xml mkdir -p src/java/org/apache/nutch/indexwriter/mongodb cd src/java/org/apache/nutch/indexwriter/mongodb touch MongoDBDataWriter.java touch MongoDBConstants.java 至此,所有相关的文件都准备好了。
3 准备build.xml
<?xml version="1.0"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <project name="indexer-mongodb" default="jar-core"> <import file="../build-plugin.xml" /> </project>
4 准备ivy.xml
<?xml version="1.0" ?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <ivy-module version="1.0"> <info organisation="org.apache.nutch" module="${ant.project.name}"> <license name="Apache 2.0"/> <ivyauthor name="Apache Nutch Team" url="http://nutch.apache.org"/> <description> Apache Nutch </description> </info> <configurations> <include file="../../..//ivy/ivy-configurations.xml"/> </configurations> <publications> <!--get the artifact from our module name--> <artifact conf="master"/> </publications> <dependencies> <dependency org="org.mongodb" name="mongo-java-driver" rev="2.12.2" conf="*->default"/> </dependencies> </ivy-module> 注意,这里写上你需要的驱动包,对于MongoDB来说,就是你的MongoDB数据库对应的版本的驱动包。
5 准备plugin.xml
<?xml version="1.0" encoding="UTF-8"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <plugin id="indexer-mongodb" name="MongoDBDataWriter" version="1.0.0" provider-name="nutch.apache.org"> <runtime> <library name="indexer-mongodb.jar"> <export name="*" /> </library> <library name="mongo-java-driver-2.12.2.jar"/> </runtime> <requires> <import plugin="nutch-extensionpoints" /> </requires> <extension id="org.apache.nutch.indexer.mongodb" name="MongoDB Data Writer" point="org.apache.nutch.indexer.IndexWriter"> <implementation id="MongoDBDataWriter" class="org.apache.nutch.indexwriter.mongodb.MongoDBDataWriter" /> </extension> </plugin>
6 编写MongoDBDataWriter.java
package org.apache.nutch.indexwriter.mongodb; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.nutch.indexer.IndexWriter; import org.apache.nutch.indexer.NutchDocument; import org.apache.nutch.indexwriter.mongodb.*; import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.mapred.JobConf; import com.mongodb.BasicDBObject; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.MongoClient; import com.mongodb.ServerAddress; import com.mongodb.WriteConcern; import com.mongodb.WriteResult; public class MongoDBDataWriter implements IndexWriter { public static Logger LOG = LoggerFactory.getLogger(MongoDBDataWriter.class); private int maxInsertDocs = 20; private int maxDeleteDocs = 20; private static int ACTION_INSERT = 1; private static int ACTION_DELETE = 2; private static int ACTION_ALL = 3; private int action; private Configuration config; private String host_port; private String database; private String collection; private MongoClient mongoClient; private ArrayList<BasicDBObject> toBeUpsertedDocs; private ArrayList<String> toBeDeletedDocs; public MongoDBDataWriter() { LOG.info("new MongoDBDataWriter() invoked---"); } private void setup() { MongoClient client = null; try { List list = new ArrayList(); String[] array = this.host_port.split(","); int index = 0; for (index = 0; index < array.length; index++) { String str = array[index]; String[] ip_port = str.split(":"); list.add(new ServerAddress(ip_port[0], Integer .parseInt(ip_port[1]))); } if (list.size() > 0) { client = new MongoClient(list); } } catch (Exception e) { LOG.info(e.toString()); client = null; } mongoClient = client; } @Override public void open(JobConf job, String name) { toBeUpsertedDocs = new ArrayList<BasicDBObject>(); toBeDeletedDocs = new ArrayList<String>(); LOG.info(this.describe()); this.setup(); if (null != this.mongoClient) { LOG.info("open for MongoClient succeed..."); } else { LOG.info("open for MongoClient fail..."); } } @Override public void write(NutchDocument doc) { LOG.info(this.toString() + "write *** invoked"); BasicDBObject bo = new BasicDBObject(); Collection<String> fields = doc.getFieldNames(); for (String key : fields) { //just use the first value... String strValue = doc.getField(key).getValues().get(0).toString(); try{ int intValue = Integer.parseInt(strValue); bo.put(key, intValue); }catch(Exception e){ bo.put(key, strValue); } } this.toBeUpsertedDocs.add(bo); if (this.toBeUpsertedDocs.size() >= this.maxInsertDocs) { action = ACTION_INSERT; this.commit(); } } @Override public void update(NutchDocument doc) { LOG.info(this.toString() + "update *** invoked"); // won't be called In Nutch 1.7 write(doc); } @Override public void delete(String key) { LOG.info(this.toString() + "delete *** invoked"); this.toBeDeletedDocs.add(key); if (this.toBeDeletedDocs.size() >= this.maxDeleteDocs) { action = ACTION_DELETE; this.commit(); } } @Override public void commit() { if (null != this.mongoClient) { DB db = this.mongoClient.getDB(this.database); if (null != db) { DBCollection dbCollection = db.getCollection(this.collection); if (null != dbCollection) { if (0 != (action & ACTION_INSERT) && this.toBeUpsertedDocs.size() > 0) { for( BasicDBObject obj:this.toBeUpsertedDocs){ dbCollection.save(obj, WriteConcern.SAFE); } //WriteResult result = dbCollection.insert( // this.toBeUpsertedDocs, WriteConcern.SAFE); this.toBeUpsertedDocs.clear(); } if (0 != (action & ACTION_DELETE) && this.toBeDeletedDocs.size() > 0) { BasicDBObject in = new BasicDBObject(); in.put("$in", this.toBeDeletedDocs); BasicDBObject remove = new BasicDBObject(); remove.put("_id", in); dbCollection.remove(remove, WriteConcern.SAFE); this.toBeDeletedDocs.clear(); } } } } LOG.info(this.toString() + "commit *** invoked"); } @Override public void close() { action = ACTION_ALL; this.commit(); if (null != mongoClient) { mongoClient.close(); mongoClient = null; } LOG.info("close the mongoclient ok!!!"); } protected void finalize( ) { // finalization code here this.close(); } @Override public String describe() { LOG.info(this.toString() + "describe *** invoked"); StringBuffer sb = new StringBuffer("MongoDB Data Writer\n"); sb.append("\t").append(MongoDBConstants.HOST_PORT).append(" : ") .append(this.host_port); sb.append("\t").append(MongoDBConstants.DATABASE).append(" : ") .append(this.database); sb.append("\t").append(MongoDBConstants.COLLECTION).append(" : ") .append(this.collection); sb.append("\t").append(MongoDBConstants.MAX_INSERT_DOCS).append(" : ") .append(this.maxInsertDocs); sb.append("\t").append(MongoDBConstants.MAX_DELETE_DOCS).append(" : ") .append(this.maxDeleteDocs); return sb.toString(); } @Override public void setConf(Configuration conf) { LOG.info(this.toString() + "setconf *** invoked"); config = conf; host_port = config.get(MongoDBConstants.HOST_PORT); database = config.get(MongoDBConstants.DATABASE); collection = config.get(MongoDBConstants.COLLECTION); maxInsertDocs=Integer.parseInt(config.get(MongoDBConstants.MAX_INSERT_DOCS)); maxDeleteDocs=Integer.parseInt(config.get(MongoDBConstants.MAX_DELETE_DOCS)); } @Override public Configuration getConf() { LOG.info(this.toString() + "getconf *** invoked"); return config; } }
7 编写MongoDBConstants.java
package org.apache.nutch.indexwriter.mongodb; public class MongoDBConstants { public static final String MONGODB_PREFIX = "mongo."; public static final String HOST_PORT = MONGODB_PREFIX + "host_port"; public static final String DATABASE = MONGODB_PREFIX + "database"; public static final String COLLECTION = MONGODB_PREFIX+"collection"; public static final String MAX_INSERT_DOCS = MONGODB_PREFIX+"max.insert.docs"; public static final String MAX_DELETE_DOCS = MONGODB_PREFIX+"max.delete.docs"; }
8 修改src/plugin/build.xml
找到 <!-- ====================================================== --> <!-- Build & deploy all the plugin jars. --> <!-- ====================================================== --> 在下面添加一行 <ant dir="indexer-mongodb" target="deploy"/>
9 编译插件
ant
10 启用indexer-mongodb插件
修改nutch-site.xml内的plugin.includes 将其中indexer-solr替换成indexer-mongodb 添加下列参数到conf/nutch-site.xml <property> <name>mongo.host_port</name> <value> 192.168.56.254:27017,192.168.56.66:27018 </value> </property> <property><name>mongo.database</name><value>nutch</value></property> <property><name>mongo.collection</name><value>doc</value></property> <property><name>mongo.max.insert.docs</name><value>30</value></property> <property><name>mongo.max.delete.docs</name><value>30</value></property>
11执行索引命令
./bin/nutch index ./data/crawldb/ -linkdb ./data/linkdb/ -dir ./data/segments/ -deleteGone -filter -normalize -noCommit
12附加说明:索引插件的流程
相关文章推荐
- MongoDB学习笔记(5)--数据导入导出mongoexport
- javascript及php笔记:自己动手写一个ajax异步上传文件的jquery插件
- MongoDB学习笔记-数据导入Excel文件
- MongoDB学习笔记2--MongoDB数据库中数据的导入
- 我的架构演化笔记 4 :MongoDB 增加集群机制
- 我的架构演化笔记 12:Nutch1.7 构建互联网爬虫
- Nutch学习笔记13---以某网站为例写解析插件
- 自己动手“数据恢复编程、数据恢复软件开发”- NTFS扫描恢复通用库
- 自己动手写iPhone wap浏览器之界面架构篇
- 自己动手编写jQuery滚动新闻(jQuery News Ticker)插件
- 笔记3--测试其中汇编程序流程【自己动手操作系统】
- 自己动手写操作系统--笔记之安装DOS
- 【学习笔记】自己动手写操作系统--pmtest1
- 手把手教你自己动手恢复坏道硬盘数据
- cassandra学习笔记5--使用Binary Memtable将大量数据导入Cassandra
- 自己动手写一个Memory cache 来缓存主数据提高性能
- [导入]自己动手,实现jQuery中的ImageCopper.
- 每月3亿PV的FaceBook朋友买卖插件的架构与数据
- 每月3亿PV的FaceBook朋友买卖插件的架构与数据
- 自己动手写操作系统学习笔记