您的位置:首页 > 编程语言 > Java开发

Java豆瓣电影爬虫——减少与数据库交互实现批量插入

2017-02-05 20:36 671 查看
  节前一个误操作把mysql中record表和movie表都清空了,显然我是没有做什么mysql备份的。所以,索性我把所有的表数据都清空的,一夜回到解放前……

  项目地址:https://github.com/DMinerJackie/JewelCrawler

  在上一个版本中,record表存储了7万多条记录,爬取的有4万多条,但是可以明显的发现爬取的数据量越多的时候,机子就越卡。又一次报错,是有关JDBC的,还有一次机子跑卡死了。

  仔细一琢磨,上个版本的爬虫程序与数据库的读写次数太频繁,存在以下问题:

    1.程序运行,从种子地址开始,对于每次爬取的网站地址先查询数据库是否存在该条记录,如果不存在,则立即插入;

    2.当前网站地址爬取完毕后,查找数据库从中取出第一个crawled为0的记录进行爬取,每次只取一条;

    3.存储电影详情页记录以及短评数据都是采用解析一条则立即存储到数据库。

  显然,上面的这种方式是一目了然的效率低下,所以今天下午对相关代码进行改造,部分实现了批量插入,尽可能减少与数据库的交互,从而降低时空成本。

  在git clone完项目后,发现一个很诡异的现象,JewelCrawler每次都是爬取种子地址,并没有一次查询数据库中crawled字段为0的记录进行一一爬取,但是之前在本机上是完美运行的,可能是在push代码前做了改动影响运行了。

既然问题出现了,就顺着这个版本看看,最终发现问题的原因是对于种子网址并没有存储到mysql的record表中,所以在DoubanCrawler类中

//set boolean value "crawled" to true after crawling this page
sql = "UPDATE record SET crawled = 1 WHERE URL = '" + url + "'";
stmt = conn.createStatement();

if (stmt.executeUpdate(sql) > 0) {
//get the next page that has not been crawled yet
sql = "SELECT * FROM record WHERE crawled = 0";
stmt = conn.createStatement();
rs = stmt.executeQuery(sql);
if (rs.next()) {
url = rs.getString(2);
} else {
//stop crawling if reach the bottom of the list
break;
}

//set a limit of crawling count
if (count > Constants.maxCycle || url == null) {
break;
}
}


  执行stmt.executeUpdate(sql) > 0是返回的值为0,从而不会从数据库中读取crawled为0的记录,最后就一直在while的循环中爬取种子网站。

  解决方法:对于种子网站既然没有存储到record的操作,那么就对种子网站做特殊处理,将if的判断条件改为if (stmt.executeUpdate(sql) > 0 || frontPage.equals(url)),这样对于种子网站即使没有update更新成功操作仍然可以进入读取数据库crawled为0 的操作。

[b]针对第一个问题,采用批量插入操作[/b]

  实现思路:对于当前爬取的网站地址,解析网页源码,提取出所有的link,对于符合正则表达式过滤的link,将其存到一个list集合中。遍历完当前网址的所有link后,将符合条件的link批量存储到数据库中。

  具体实现如下

public static void parseFromString(String content, Connection conn) throws Exception {

Parser parser = new Parser(content);
HasAttributeFilter filter = new HasAttributeFilter("href");

String sql1 = null;
ResultSet rs1 = null;
PreparedStatement pstmt1 = null;
Statement stmt1 = null;

List<String> nextLinkList = new ArrayList<String>();

int rowCount = 0;
sql1 = "select count(*) as rowCount from record";
stmt1 = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
rs1 = stmt1.executeQuery(sql1);
if (rs1.next()) {
rowCount = rs1.getString("rowCount") != null ? Integer.parseInt(rs1.getString("rowCount")) : 0;
}

if (rowCount <= Constants.maxCycle) { //once rowCount is bigger than maxCycle, the new crawled link will not insert into record table
try {
NodeList list = parser.parse(filter);
int count = list.size();

//process every link on this page
for (int i = 0; i < count; i++) {
Node node = list.elementAt(i);

if (node instanceof LinkTag) {
LinkTag link = (LinkTag) node;
String nextLink = link.extractLink();
String mainUrl = Constants.MAINURL;

if (nextLink.startsWith(mainUrl)) {
//check if the link already exists in the database
sql1 = "SELECT * FROM record WHERE URL = '" + nextLink + "'";
stmt1 = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
rs1 = stmt1.executeQuery(sql1);
if (rs1.next()) {

} else {
Pattern moviePattern = Pattern.compile(Constants.MOVIE_REGULAR_EXP);
Matcher movieMatcher = moviePattern.matcher(nextLink);

Pattern commentPattern = Pattern.compile(Constants.COMMENT_REGULAR_EXP);
Matcher commentMatcher = commentPattern.matcher(nextLink);

if (movieMatcher.find() || commentMatcher.find()) {
nextLinkList.add(nextLink);
}
}
}
}
}
if (nextLinkList.size() > 0) {
conn.setAutoCommit(false);
//if the link does not exist in the database, insert it
sql1 = "INSERT INTO record (URL, crawled) VALUES (?,0)";
pstmt1 = conn.prepareStatement(sql1, Statement.RETURN_GENERATED_KEYS);
for (String nextLinkStr : nextLinkList) {
pstmt1.setString(1, nextLinkStr);
pstmt1.addBatch();
System.out.println(nextLinkStr);
}
pstmt1.executeBatch();
conn.commit();
}
} catch (Exception e) {
//handle the exceptions
e.printStackTrace();
System.out.println("SQLException: " + e.getMessage());
} finally {
//close and release the resources of PreparedStatement, ResultSet and Statement
if (pstmt1 != null) {
try {
pstmt1.close();
} catch (SQLException e2) {
}
}
pstmt1 = null;

if (rs1 != null) {
try {
rs1.close();
} catch (SQLException e1) {
}
}
rs1 = null;

if (stmt1 != null) {
try {
stmt1.close();
} catch (SQLException e3) {
}
}
stmt1 = null;
}
}
}


  1.通过正则匹配,找到符合条件的link,并添加到nextLinkList集合中

2.遍历完后,将数据存到数据库中

  3. 在批量操作中,使用了addBatch()方法和executeBatch()方法,注意需要添加conn.setAutoCommit(false);以及conn.commit()表示手动提交。

针对第二个问题,采用一次查询多条记录

  实现思路:将每次只查询一条记录,改为每次查询10条记录,并将这10条记录存放到list集合中,并将原来的String类型的url改为list类型的urlList传入到DouBanHttpGetUtil.getByString()方法里。这样即减少了与数据库的交互,同时也减少了对于getByString方法的调用。

  具体实现如下

public static void main(String args[]) throws Exception {

//load and read seed file
List<String> seedList = LoadSeed.loadSeed();
if (seedList == null) {
log.info("No seed to crawl, please check again");
return;
}
String frontPage = seedList.get(0);

//connect database mysql
Connection conn = DBUtils.connectDB();

//create tables to store crawled data
DBUtils.createTables();

String sql = null;
String url = frontPage;
Statement stmt = null;
ResultSet rs = null;
int count = 0;
List<String> urlList = new ArrayList<String>();
urlList.add(url);

//crawl every link in the database
while (true) {
//get page content of link "url"
DouBanHttpGetUtil.getByString(urlList, conn);
count++;

//set boolean value "crawled" to true after crawling this page

//TODO batch update
int result = 0;
conn.setAutoCommit(true);
for (String urlStr : urlList) {
sql = "UPDATE record SET crawled = 1 WHERE URL = '" + urlStr + "'";
stmt = conn.createStatement();
stmt.executeUpdate(sql);
}

urlList.clear();//empty for every loop
if (stmt.executeUpdate(sql) > 0  || frontPage.equals(url)) {
//get the next page that has not been crawled yet
sql = "SELECT * FROM record WHERE crawled = 0 limit 10";
stmt = conn.createStatement();
rs = stmt.executeQuery(sql);
while (rs.next()) {
url = rs.getString(2);
urlList.add(url);
}

//set a limit of crawling count
if (rs.next() || count > Constants.maxCycle || url == null) {
break;
}
}
}
conn.close();
conn = null;

System.out.println("Done.");
System.out.println(count);
}


  注意: 1.这里采用每次读取10条记录,相应的也需要将这10条记录的crawled字段更新为1,表示爬取过。

     2. mysql不支持top 10 * 这样的语法,但是可以通过代码中所示的limit 10 的方式取出数据。

3. 添加conn.setAutoCommit(true);表示更新操作设置为自动提交,这样就可以解决虽然程序执行成功但是数据没有更新到数据库的现象。

针对第三个问题,与第一个问题解决方法相同。

虽然不知道这样做带来的效果有多明显,或有是否有更好的解决方案,但是可以肯定的是上个版本的代码会大量占用内存并频繁与数据库交互。本人是数据库小白,希望有更好的方案可以提出来^_^



如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”将是我最大的写作动力!如果您想持续关注我的文章,请扫描二维码,关注JackieZheng的微信公众号,我会将我的文章推送给您,并和您一起分享我日常阅读过的优质文章。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: