利用Python和Redis构建一个免费的代理池
2017-11-27 18:14
519 查看
当我们使用爬虫大量重复的请求一个网站时,我们可能回去到封IP的情况,这时候我们就需要使用代理来伪装我们的IP,使之请求能够再次发起
代理IP的获取:python爬虫
代理池的存储:redis数据库
代理IP检测:构建请求,检测是否可用
调度器:负责统筹调度以上各功能的执行
在run方法中我开了两个进程,分别启动test_proxy和get_proxy方法,方法中再具体调用ProxyGetter类和VaildTest类来完成具体功能的操作
因为涉及到redis数据库的操作,所以要为这两个进程加锁。cycle参数为两个进程运行的周期
这个类主要就是爬取各大代理网站,然后把取得的IP写入到数据库中,没有什么难的
在这里因为要构建大量请求,所以我使用了grequests类来异步构建请求,grequests的具体用法请百度
以上就是代理池的具体架构和实现思路
代理池的整体构造
代理IP来源:各大代理IP网站代理IP的获取:python爬虫
代理池的存储:redis数据库
代理IP检测:构建请求,检测是否可用
调度器:负责统筹调度以上各功能的执行
调度器:Scheduler类
调度器本身不具有任何具体功能,他只是负责调用已经存在的类和方法,来完成代理的抓取和检测from Myself.config import * from Myself.getter import ProxyGetter from Myself.tester import VaildTest from Myself.db import RedisClient import time import multiprocessing from multiprocessing import Process ''' 调度器 调用tester来检测IP是否可用 调用getter来从各大网站获取代理IP ''' class Scheduler(): #获取代理的方法,传入lock锁住数据库的操作,cycle为周期时间 def get_proxy(self,lock,cycle=GET_PROXY_CYCLE): while True: lock.acquire() conn = RedisClient() #具体的操作由ProxyGetter类发起 getter =ProxyGetter(conn) print('正在获取IP') try: #执行检测程序 getter.run() except Exception as e: print('程序出了点小异常', e.args) finally: #释放锁并且休眠程序,等待周期结束 lock.release() time.sleep(cycle) #检测代理的方法,传入lock锁住数据库的操作,cycle为周期时间 def test_proxy(self,lock,cycle=TEST_PROXY_CYCLE): while True: lock.acquire() conn = RedisClient() # 具体的操作由RedisClient类发起 tester = VaildTest(conn) print('正在检测IP') try: #执行检测程序 tester.valid_test() except Exception as e: print('程序出了点小异常',e.args) finally: # 释放锁并且休眠程序,等待周期结束 lock.release() time.sleep(cycle) def run(self): #获取锁的对象 lock = multiprocessing.Lock() if VALID_TEST_PROCESS: valid_test_process = Process(target=self.test_proxy,args=(lock,)) valid_test_process.start() if GET_PROXY_PROCESS: get_proxy_process = Process(target=self.get_proxy,args=(lock,)) get_proxy_process.start() if __name__=='__main__': test=Scheduler() test.run()
在run方法中我开了两个进程,分别启动test_proxy和get_proxy方法,方法中再具体调用ProxyGetter类和VaildTest类来完成具体功能的操作
因为涉及到redis数据库的操作,所以要为这两个进程加锁。cycle参数为两个进程运行的周期
代理IP的获取:ProxyGetter类
import requests from bs4 import BeautifulSoup from pyquery import PyQuery as pq import bs4 from Myself.db import RedisClient ''' 这个类为获取代理IP的类 ''' class ProxyGetter(object): # 数据库对象必须要以参数的形式传入,否则多进程的时候会报错(不能序列化_thread.lock对象)。 # 多进程操作数据库必须要注意进程锁问题 def __init__(self,conn): self.headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36', 'Accept-Encoding': 'gzip, deflate, sdch', 'Accept-Language': 'zh-CN,zh;q=0.8' } self.conn=conn ''' 此方法用来获取网页源代码 ''' def get_page(self,url,charset='utf-8',options={}): headers=dict(self.headers,**options) r = requests.get(url, headers=headers,timeout=3) r.encoding=charset try: print('Getting result:',url,r.status_code) if r.status_code==200: return r.text except ConnectionError: print('Crawling Failed', url,r.status_code) ''' 此代理网站暂时无法使用 ''' def crawl_66(self,page_count): start_url = 'http://www.66ip.cn/{}.html' urls=[start_url.format(page) for page in range(1,page_count) ] for url in urls: html=self.get_page(url,charset='gb2312') soup=BeautifulSoup(html,'lxml') table=soup.select('table[bordercolor="#6699ff"]')[0] trs=table.find_all('tr')[1:-1] for out in trs: ip=out.find_all('td')[0].string port=out.find_all('td')[1].string yield{ 'scheme':'http', 'proxy':':'.join([ip,port]) } ''' 解析网页源代码,并且获取代理IP和端口 page_count为爬取的页数 ''' def crawl_goubanjia(self,page_count): start_url = 'http://www.goubanjia.com/free/gngn/index{}.shtml' #按照页数来构造多个网页 urls = [start_url.format(page) for page in range(1, page_count)] #遍历每个网页 for url in urls: html = self.get_page(url) if html: doc = pq(html) trs = doc('table.table tr') for tr in trs.items(): td = tr.find('td.ip') td.find('p').remove() proxy = td.text().replace(' ', '') scheme = tr.find('td:nth-child(3)').text() if ',' in scheme: scheme = scheme.split(',')[0] if scheme and proxy: #构造迭代器并返回 yield { 'scheme': 'http', 'proxy': proxy } def crawl_proxy360(self): start_url = 'http://www.proxy360.cn/Region/China' print('Crawling', start_url) html = self.get_page(start_url) if html: doc = pq(html) lines = doc('div[name="list_proxy_ip"]').items() for line in lines: ip = line.find('.tbBottomLine:nth-child(1)').text() port = line.find('.tbBottomLine:nth-child(2)').text() yield { 'scheme': 'http', 'proxy': ':'.join([ip, port]) } def crwal_xici(self): start_url='http://www.xicidaili.com/' html=self.get_page(start_url) soup=BeautifulSoup(html,'lxml') country=soup.find(class_='odd') countrys=country.next_siblings for out in countrys: if(type(out)==bs4.element.Tag): infos=out.find_all('td') if len(infos)>0: ip=infos[1].string port=infos[2].string yield { 'scheme':'http', 'proxy':':'.join([ip,port]) } #运行各个解析方法 def run(self): #获取解析到的数据 results1=self.crawl_proxy360() for result1 in results1: print('Getter Proxy:',result1) out=result1.get('scheme').lower() #将解析过的数据添加到redis数据库中 self.conn.add(shecme=out,proxy=result1.get('proxy')) results2=self.crwal_xici() for result2 in results2: print('Getter Proxy:',result2) out=result2.get('scheme').lower() self.conn.add(shecme=out,proxy=result2.get('proxy')) results3=self.crawl_goubanjia(10) for result3 in results3: print('Getter Proxy:',result3) out=result3.get('scheme').lower() self.conn.add(shecme=out,proxy=result3.get('proxy')) if __name__=='__main__': test=ProxyGetter() test.run()
这个类主要就是爬取各大代理网站,然后把取得的IP写入到数据库中,没有什么难的
代理IP检测:VaildTest类
这个类主要就是以抓取到的代理IP构建请求,如果请求成功则将代理IP放置在redis数据库的最后,请求失败则将代理IP位置向前移一位import redis import random from Myself.config import * class RedisClient(object): #初始化驱动 def __init__(self, host=HOST, port=PORT, domain=DOMAIN): if PASSWORD: self._db = redis.Redis(host=host, port=port, password=PASSWORD) else: self._db = redis.Redis(host=host, port=port) self.domain = domain #获取数据库中所有键值 def keys(self): return self._db.keys(self.key('*')) #获取某个键 def key(self,shecme): return '{domain}:{scheme}'.format(domain=self.domain,scheme=shecme) #将proxy传入scheme中 def add(self,shecme,proxy): return self._db.zadd(self.key(shecme),proxy,DEFAULT_SCORE) #返回固定键值的所有值 def all(self,scheme): return self._db.zrange(self.key(scheme),0,-1) #更新scheme键中的proxy参数,将它放在最后一个 def up(self,scheme,proxy): score=self._db.zscore(self.key(scheme),proxy) return self._db.zincrby(self.key(scheme),proxy,MAX_SCORE-score) #更新scheme键中的proxy参数,将他往前移一个 def down(self,scheme,proxy): self._db.zincrby(self.key(scheme), proxy, -1) #如果此代理的分数值小于最小分数值,则将此删除 if self._db.zscore(self.key(scheme), proxy) <= MIN_SCORE: self._db.zrem(self.key(scheme), proxy) #设置proxy为使用状态,将数值设为50 def set_use(self,proxy): self._db.zincrby(self.key('http'), proxy, -50) #获取一个可用IP def get_proxy(self): proxy='' proxy_db=self._db.zrange(self.key('http'),-2,-1) for out in proxy_db: proxy=out.decode('utf-8').strip() self.set_use(proxy) return proxy #获取特定数量的可用IP,做测试用 def use_test(self): return self._db.zrange(self.key('http'), -8, -1) if __name__=='__main__': test=RedisClient() out=test.use() print(type(out)) print(out)
在这里因为要构建大量请求,所以我使用了grequests类来异步构建请求,grequests的具体用法请百度
代理池数据库的存储与操作:RedisClient类
因为我们的代理IP都是要放在redis数据库里面的,无论是存储,使用还是删除,我们最终的操作都是要落实到数据库里面的,所以我定义了一个RedisClient类来操作数据库import redis import random from Myself.config import * class RedisClient(object): #初始化驱动 def __init__(self, host=HOST, port=PORT, domain=DOMAIN): if PASSWORD: self._db = redis.Redis(host=host, port=port, password=PASSWORD) else: self._db = redis.Redis(host=host, port=port) self.domain = domain #获取数据库中所有键值 def keys(self): return self._db.keys(self.key('*')) #获取某个键 def key(self,shecme): return '{domain}:{scheme}'.format(domain=self.domain,scheme=shecme) #将proxy传入scheme中 def add(self,shecme,proxy): return self._db.zadd(self.key(shecme),proxy,DEFAULT_SCORE) #返回固定键值的所有值 def all(self,scheme): return self._db.zrange(self.key(scheme),0,-1) # be90 更新scheme键中的proxy参数,将它放在最后一个 def up(self,scheme,proxy): score=self._db.zscore(self.key(scheme),proxy) return self._db.zincrby(self.key(scheme),proxy,MAX_SCORE-score) #更新scheme键中的proxy参数,将他往前移一个 def down(self,scheme,proxy): self._db.zincrby(self.key(scheme), proxy, -1) #如果此代理的分数值小于最小分数值,则将此删除 if self._db.zscore(self.key(scheme), proxy) <= MIN_SCORE: self._db.zrem(self.key(scheme), proxy) #设置proxy为使用状态,将数值设为50 def set_use(self,proxy): self._db.zincrby(self.key('http'), proxy, -50) #从数据库中获取一个可用IP def get_proxy(self): proxy='' proxy_db=self._db.zrange(self.key('http'),-2,-1) for out in proxy_db: proxy=out.decode('utf-8').strip() self.set_use(proxy) return proxy #获取特定数量的可用IP,做测试用 def use_test(self): return self._db.zrange(self.key('http'), -8, -1) if __name__=='__main__': test=RedisClient() out=test.use() print(type(out)) print(out)
配置文件:
在程序运行的时候,会有一些具体的参数,例如redis的用户名和密码、获取代理和检测代理的周期,测试要用的网址等#域名 DOMAIN='proxy' HOST='localhost' PORT=6379 PASSWORD='' #测试要用的网址 TEST_URL = 'http://www.baidu.com' #设置获取代理的周期 GET_PROXY_CYCLE=500 #设置检测代理的周期 TEST_PROXY_CYCLE=100 #获取代理的开关 GET_PROXY_PROCESS = True #检测代理的开关 VALID_TEST_PROCESS = True #设置代理的最大分数值 MAX_SCORE = 100 #设置代理的最小分数值 MIN_SCORE = 2 #设置代理的默认分数值 DEFAULT_SCORE = 10 #设置使用中的代理的分数值 USE_STATUS_SCORE=50
以上就是代理池的具体架构和实现思路
相关文章推荐
- python爬虫由浅入深15---利用Redis+Flask来维护代理池和Cookie池
- 利用redis的订阅和发布来实现实时监控的一个DEMO(Python版本)
- python中利用redis构建任务队列(queue)
- python爬虫抓取豆瓣所有恐怖片信息(利用多线程和构建免费ip代理池)
- python利用redis构成一个队列
- 利用Python的Flask框架来构建一个简单的数字商品支付解决方案
- 利用redis的订阅和发布来实现实时监控的一个DEMO(Python版本)
- Python3利用网页接口制作一个免费的VIP视频播放软件
- 推荐一个免费的构建Beyesian贝叶斯概率网络的JAVA,C#,.NET, R, Matlab,Python,Ruby 的API
- 使用Python构建一个基于k-means的文档分类器
- 利用python搭建一个简单的http服务器
- 利用socket.io构建一个聊天室
- 利用python生成一个导出数据库的bat脚本文件的方法
- 第三百五十七节,Python分布式爬虫打造搜索引擎Scrapy精讲—利用开源的scrapy-redis编写分布式爬虫代码
- 利用Roslyn构建一个简单的C#交互脚本引擎
- 如何利用Linux构建免费的缓存DNS服务器
- 用 Python 构建一个极小的区块链
- 利用Azure Redis Cache构建百万量级缓存读写
- 利用python闭包性质写只用函数来写一个类
- 利用python在gnuradio下写一个block(python 学习)