单机下进行分布式爬取数据(windows下单机模拟多机进行分布式爬虫)
2018-02-26 09:12
253 查看
URL管理器ControlNode/ URLManager.py
数据存储端:ControlNode/DataOutput.py
控制节点端ControlNode/NodeManager.py
以下部署在另一台机器上,为爬虫端
爬虫调度器SpiderNode/SpiderWork.py
HTML解析器SpiderNode/HtmlParser.py
HTML下载器SpiderNode/HtmlDownloader.py
#coding:utf-8 import cPickle import hashlib class UrlManager(object): def __init__(self): self.new_urls = self.load_progress('new_urls.txt')#未爬取URL集合 self.old_urls = self.load_progress('old_urls.txt')#已爬取URL集合 def has_new_url(self): ''' 判断是否有未爬取的URL :return: ''' return self.new_url_size()!=0 def get_new_url(self): ''' 获取一个未爬取的URL :return: ''' new_url = self.new_urls.pop() m = hashlib.md5() m.update(new_url) self.old_urls.add(m.hexdigest()[8:-8]) return new_url def add_new_url(self,url): ''' 将新的URL添加到未爬取的URL集合中 :param url:单个URL :return: ''' if url is None: return m = hashlib.md5() m.update(url) url_md5 = m.hexdigest()[8:-8] if url not in self.new_urls and url_md5 not in self.old_urls: self.new_urls.add(url) def add_new_urls(self,urls): ''' 将新的URLS添加到未爬取的URL集合中 :param urls:url集合 :return: ''' if urls is None or len(urls)==0: return for url in urls: self.add_new_url(url) def new_url_size(self): ''' 获取未爬取URL集合的s大小 :return: ''' return len(self.new_urls) def old_url_size(self): ''' 获取已经爬取URL集合的大小 :return: ''' return len(self.old_urls) def save_progress(self,path,data): ''' 保存进度 :param path:文件路径 :param data:数据 :return: ''' with open(path, 'wb') as f: cPickle.dump(data, f) def load_progress(self,path): ''' 从本地文件加载进度 :param path:文件路径 :return:返回set集合 ''' print '[+] 从文件加载进度: %s' % path try: with open(path, 'rb') as f: tmp = cPickle.load(f) return tmp except: print '[!] 无进度文件, 创建: %s' % path return set()
数据存储端:ControlNode/DataOutput.py
#coding:utf-8 import codecs import time class DataOutput(object): def __init__(self): self.filepath='baike_%s.html'%(time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime()) ) self.output_head(self.filepath) self.datas=[] def store_data(self,data): if data is None: return self.datas.append(data) if len(self.datas)>10: self.output_html(self.filepath) def output_head(self,path): ''' 将HTML头写进去 :return: ''' fout=codecs.open(path,'w',encoding='utf-8') fout.write("<html>") fout.write("<body>") fout.write("<table>") fout.close() def output_html(self,path): ''' 将数据写入HTML文件中 :param path: 文件路径 :return: ''' fout=codecs.open(path,'a',encoding='utf-8') for data in self.datas: fout.write("<tr>") fout.write("<td>%s</td>"%data['url']) fout.write("<td>%s</td>"%data['title']) fout.write("<td>%s</td>"%data['summary']) fout.write("</tr>") self.datas=[] fout.close() def ouput_end(self,path): ''' 输出HTML结束 :param path: 文件存储路径 :return: ''' fout=codecs.open(path,'a',encoding='utf-8') fout.write("</table>") fout.write("</body>") fout.write("</html>") fout.close()
控制节点端ControlNode/NodeManager.py
#coding:utf-8 from multiprocessing.managers import BaseManager import time from multiprocessing import Process, Queue from DataOutput import DataOutput from UrlManager import UrlManager class NodeManager(object): def start_Manager(self,url_q,result_q): ''' 创建一个分布式管理器 :param url_q: url队列 :param result_q: 结果队列 :return: ''' #把创建的两个队列注册在网络上,利用register方法,callable参数关联了Queue对象, # 将Queue对象在网络中暴露 BaseManager.register('get_task_queue',callable=lambda:url_q) BaseManager.register('get_result_queue',callable=lambda:result_q) #绑定端口8001,设置验证口令‘baike’。这个相当于对象的初始化 manager=BaseManager(address=('',8001),authkey='baike') #返回manager对象 return manager def url_manager_proc(self,url_q,conn_q,root_url): url_manager = UrlManager() url_manager.add_new_url(root_url) while True: while(url_manager.has_new_url()): #从URL管理器获取新的url new_url = url_manager.get_new_url() #将新的URL发给工作节点 url_q.put(new_url) print 'old_url=',url_manager.old_url_size() #加一个判断条件,当爬去2000个链接后就关闭,并保存进度 if(url_manager.old_url_size()>2000): #通知爬行节点工作结束 url_q.put('end') print '控制节点发起结束通知!' #关闭管理节点,同时存储set状态 url_manager.save_progress('new_urls.txt',url_manager.new_urls) url_manager.save_progress('old_urls.txt',url_manager.old_urls) return #将从result_solve_proc获取到的urls添加到URL管理器之间 try: if not conn_q.empty(): urls = conn_q.get() url_manager.add_new_urls(urls) except BaseException,e: time.sleep(0.1)#延时休息 def result_solve_proc(self,result_q,conn_q,store_q): while(True): try: if not result_q.empty(): content = result_q.get(True) if content['new_urls']=='end': #结果分析进程接受通知然后结束 print '结果分析进程接受通知然后结束!' store_q.put('end') return conn_q.put(content['new_urls'])#url为set类型 store_q.put(content['data'])#解析出来的数据为dict类型 else: time.sleep(0.1)#延时休息 except BaseException,e: time.sleep(0.1)#延时休息 def store_proc(self,store_q): output = DataOutput() while True: if not store_q.empty(): data = store_q.get() if data=='end': print '存储进程接受通知然后结束!' output.ouput_end(output.filepath) return output.store_data(data) else: time.sleep(0.1) pass if __name__=='__main__': #初始化4个队列 url_q = Queue() result_q = Queue() store_q = Queue() conn_q = Queue() #创建分布式管理器 node = NodeManager() manager = node.start_Manager(url_q,result_q) #创建URL管理进程、 数据提取进程和数据存储进程 url_manager_proc = Process(target=node.url_manager_proc, args=(url_q,conn_q,'http://baike.baidu.com/view/284853.htm',)) result_solve_proc = Process(target=node.result_solve_proc, args=(result_q,conn_q,store_q,)) store_proc = Process(target=node.store_proc, args=(store_q,)) #启动3个进程和分布式管理器 url_manager_proc.start() result_solve_proc.start() store_proc.start() manager.get_server().serve_forever()
以下部署在另一台机器上,为爬虫端
爬虫调度器SpiderNode/SpiderWork.py
#coding:utf-8 from multiprocessing.managers import BaseManager from HtmlDownloader import HtmlDownloader from HtmlParser import HtmlParser class SpiderWork(object): def __init__(self): #初始化分布式进程中的工作节点的连接工作 # 实现第一步:使用BaseManager注册获取Queue的方法名称 BaseManager.register('get_task_queue') BaseManager.register('get_result_queue') # 实现第二步:连接到服务器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和验证口令注意保持与服务进程设置的完全一致: self.m = BaseManager(address=(server_addr, 8001), authkey='baike') # 从网络连接: self.m.connect() # 实现第三步:获取Queue的对象: self.task = self.m.get_task_queue() self.result = self.m.get_result_queue() #初始化网页下载器和解析器 self.downloader = HtmlDownloader() self.parser = HtmlParser() print 'init finish' def crawl(self): while(True): try: if not self.task.empty(): url = self.task.get() if url =='end': print '控制节点通知爬虫节点停止工作...' #接着通知其它节点停止工作 self.result.put({'new_urls':'end','data':'end'}) return print '爬虫节点正在解析:%s'%url.encode('utf-8') content = self.downloader.download(url) new_urls,data = self.parser.parser(url,content) self.result.put({"new_urls":new_urls,"data":data}) except EOFError,e: print "连接工作节点失败" return except Exception,e: print e print 'Crawl fali ' if __name__=="__main__": spider = SpiderWork() spider.crawl()
HTML解析器SpiderNode/HtmlParser.py
#coding:utf-8 import re import urlparse from bs4 import BeautifulSoup class HtmlParser(object): def parser(self,page_url,html_cont): ''' 用于解析网页内容抽取URL和数据 :param page_url: 下载页面的URL :param html_cont: 下载的网页内容 :return:返回URL和数据 ''' if page_url is None or html_cont is None: return soup = BeautifulSoup(html_cont,'html.parser',from_encoding='utf-8') new_urls = self._get_new_urls(page_url,soup) new_data = self._get_new_data(page_url,soup) return new_urls,new_data def _get_new_urls(self,page_url,soup): ''' 抽取新的URL集合 :param page_url: 下载页面的URL :param soup:soup :return: 返回新的URL集合 ''' new_urls = set() #抽取符合要求的a标签 # 原书代码 # links = soup.find_all('a', href=re.compile(r'/view/\d+\.htm')) #2017-07-03 更新,原因百度词条的链接形式发生改变 links = soup.find_all('a',href=re.compile(r'/item/.*')) for link in links: #提取href属性 new_url = link['href'] #拼接成完整网址 new_full_url = urlparse.urljoin(page_url,new_url) new_urls.add(new_full_url) return new_urls def _get_new_data(self,page_url,soup): ''' 抽取有效数据 :param page_url:下载页面的URL :param soup: :return:返回有效数据 ''' data={} data['url']=page_url title = soup.find('dd',class_='lemmaWgt-lemmaTitle-title').find('h1') data['title']=title.get_text() summary = soup.find('div',class_='lemma-summary') #获取到tag中包含的所有文版内容包括子孙tag中的内容,并将结果作为Unicode字符串返回 data['summary']=summary.get_text() return data
HTML下载器SpiderNode/HtmlDownloader.py
#coding:utf-8 import requests class HtmlDownloader(object): def download(self,url): if url is None: return None user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)' headers={'User-Agent':user_agent} r = requests.get(url,headers=headers) if r.status_code==200: r.encoding='utf-8' return r.text return None
相关文章推荐
- 利用OpenDataSource、OPENROWSET进行分布式查询和数据的导入导出
- 利用OpenDataSource、OPENROWSET进行分布式查询和数据的导入导出
- 基于Python实现微信公众号爬虫进行数据分析
- 面向对象的方式进行数据交换网络之间的差异--无缝切换的发展到单机游戏C/S模式
- 基于Mongodb进行分布式数据存储
- 第三百六十七节,Python分布式爬虫打造搜索引擎Scrapy精讲—elasticsearch(搜索引擎)scrapy写入数据到elasticsearch中
- Python爬虫(二)——对开封市58同城出租房数据进行分析
- 基于Mongodb进行分布式数据存储
- Oracle进行模拟测试数据的一个例子
- 没有dev-server.js文件,如何进行后台数据模拟?
- windows下hadoop的单机伪分布式部署(1)
- 如何获取摄像头的控制指令数据,进行模拟发送控制
- 【利用OpenDataSource、OPENROWSET进行分布式查询和数据的导入导出】
- 基于MongoDB进行分布式数据存储的步骤
- 在windows下对android进行实时快速录屏和模拟点击(一)——使用adb命令行
- (大数据)转载:Windows下单机安装Spark开发环境
- 使用Git在Mac和Windows系统之间进行同步数据
- Oracle进行模拟测试数据的一个例子
- 【转载】Fiddler进行模拟Post提交json数据,总为null解决方式
- 没有dev-server.js文件,如何进行后台数据模拟?