您的位置:首页 > 数据库 > Redis

利用Python和Redis构建一个免费的代理池

2017-11-27 18:14 519 查看
当我们使用爬虫大量重复的请求一个网站时,我们可能回去到封IP的情况,这时候我们就需要使用代理来伪装我们的IP,使之请求能够再次发起

代理池的整体构造

代理IP来源:各大代理IP网站

代理IP的获取:python爬虫

代理池的存储:redis数据库

代理IP检测:构建请求,检测是否可用

调度器:负责统筹调度以上各功能的执行

调度器:Scheduler类

调度器本身不具有任何具体功能,他只是负责调用已经存在的类和方法,来完成代理的抓取和检测

from Myself.config import *
from Myself.getter import ProxyGetter
from Myself.tester import VaildTest
from Myself.db import RedisClient
import time
import multiprocessing
from multiprocessing import Process

'''
调度器
调用tester来检测IP是否可用
调用getter来从各大网站获取代理IP
'''
class Scheduler():
#获取代理的方法,传入lock锁住数据库的操作,cycle为周期时间
def get_proxy(self,lock,cycle=GET_PROXY_CYCLE):
while True:
lock.acquire()
conn = RedisClient()
#具体的操作由ProxyGetter类发起
getter =ProxyGetter(conn)
print('正在获取IP')
try:
#执行检测程序
getter.run()
except Exception as e:
print('程序出了点小异常', e.args)
finally:
#释放锁并且休眠程序,等待周期结束
lock.release()
time.sleep(cycle)

#检测代理的方法,传入lock锁住数据库的操作,cycle为周期时间
def test_proxy(self,lock,cycle=TEST_PROXY_CYCLE):
while True:
lock.acquire()
conn = RedisClient()
# 具体的操作由RedisClient类发起
tester = VaildTest(conn)
print('正在检测IP')
try:
#执行检测程序
tester.valid_test()
except Exception as e:
print('程序出了点小异常',e.args)
finally:
# 释放锁并且休眠程序,等待周期结束
lock.release()
time.sleep(cycle)
def run(self):
#获取锁的对象
lock = multiprocessing.Lock()
if VALID_TEST_PROCESS:
valid_test_process = Process(target=self.test_proxy,args=(lock,))
valid_test_process.start()
if GET_PROXY_PROCESS:
get_proxy_process = Process(target=self.get_proxy,args=(lock,))
get_proxy_process.start()

if __name__=='__main__':
test=Scheduler()
test.run()


在run方法中我开了两个进程,分别启动test_proxy和get_proxy方法,方法中再具体调用ProxyGetter类和VaildTest类来完成具体功能的操作

因为涉及到redis数据库的操作,所以要为这两个进程加锁。cycle参数为两个进程运行的周期

代理IP的获取:ProxyGetter类

import requests
from bs4 import BeautifulSoup
from pyquery import PyQuery as pq
import bs4
from Myself.db import RedisClient

'''
这个类为获取代理IP的类
'''

class ProxyGetter(object):
# 数据库对象必须要以参数的形式传入,否则多进程的时候会报错(不能序列化_thread.lock对象)。
# 多进程操作数据库必须要注意进程锁问题
def __init__(self,conn):
self.headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.71 Safari/537.36',
'Accept-Encoding': 'gzip, deflate, sdch',
'Accept-Language': 'zh-CN,zh;q=0.8'
}
self.conn=conn

'''
此方法用来获取网页源代码
'''
def get_page(self,url,charset='utf-8',options={}):
headers=dict(self.headers,**options)
r = requests.get(url, headers=headers,timeout=3)
r.encoding=charset
try:
print('Getting result:',url,r.status_code)
if r.status_code==200:
return r.text
except ConnectionError:
print('Crawling Failed', url,r.status_code)

'''
此代理网站暂时无法使用
'''
def crawl_66(self,page_count):
start_url = 'http://www.66ip.cn/{}.html'
urls=[start_url.format(page) for page in range(1,page_count) ]
for url in urls:
html=self.get_page(url,charset='gb2312')
soup=BeautifulSoup(html,'lxml')
table=soup.select('table[bordercolor="#6699ff"]')[0]
trs=table.find_all('tr')[1:-1]
for out in trs:
ip=out.find_all('td')[0].string
port=out.find_all('td')[1].string
yield{
'scheme':'http',
'proxy':':'.join([ip,port])
}
'''
解析网页源代码,并且获取代理IP和端口
page_count为爬取的页数
'''
def crawl_goubanjia(self,page_count):
start_url = 'http://www.goubanjia.com/free/gngn/index{}.shtml'
#按照页数来构造多个网页
urls = [start_url.format(page) for page in range(1, page_count)]
#遍历每个网页
for url in urls:
html = self.get_page(url)
if html:
doc = pq(html)
trs = doc('table.table tr')
for tr in trs.items():
td = tr.find('td.ip')
td.find('p').remove()
proxy = td.text().replace(' ', '')
scheme = tr.find('td:nth-child(3)').text()
if ',' in scheme:
scheme = scheme.split(',')[0]
if scheme and proxy:
#构造迭代器并返回
yield {
'scheme': 'http',
'proxy': proxy
}
def crawl_proxy360(self):
start_url = 'http://www.proxy360.cn/Region/China'
print('Crawling', start_url)
html = self.get_page(start_url)
if html:
doc = pq(html)
lines = doc('div[name="list_proxy_ip"]').items()
for line in lines:
ip = line.find('.tbBottomLine:nth-child(1)').text()
port = line.find('.tbBottomLine:nth-child(2)').text()
yield {
'scheme': 'http',
'proxy': ':'.join([ip, port])
}

def crwal_xici(self):
start_url='http://www.xicidaili.com/'
html=self.get_page(start_url)
soup=BeautifulSoup(html,'lxml')
country=soup.find(class_='odd')
countrys=country.next_siblings
for out in countrys:
if(type(out)==bs4.element.Tag):
infos=out.find_all('td')
if len(infos)>0:
ip=infos[1].string
port=infos[2].string
yield {
'scheme':'http',
'proxy':':'.join([ip,port])
}
#运行各个解析方法
def run(self):
#获取解析到的数据
results1=self.crawl_proxy360()
for result1 in results1:
print('Getter Proxy:',result1)
out=result1.get('scheme').lower()
#将解析过的数据添加到redis数据库中
self.conn.add(shecme=out,proxy=result1.get('proxy'))
results2=self.crwal_xici()
for result2 in results2:
print('Getter Proxy:',result2)
out=result2.get('scheme').lower()
self.conn.add(shecme=out,proxy=result2.get('proxy'))
results3=self.crawl_goubanjia(10)
for result3 in results3:
print('Getter Proxy:',result3)
out=result3.get('scheme').lower()
self.conn.add(shecme=out,proxy=result3.get('proxy'))
if __name__=='__main__':
test=ProxyGetter()
test.run()


这个类主要就是爬取各大代理网站,然后把取得的IP写入到数据库中,没有什么难的

代理IP检测:VaildTest类

这个类主要就是以抓取到的代理IP构建请求,如果请求成功则将代理IP放置在redis数据库的最后,请求失败则将代理IP位置向前移一位

import redis
import random
from Myself.config import *

class RedisClient(object):

#初始化驱动
def __init__(self, host=HOST, port=PORT, domain=DOMAIN):
if PASSWORD:
self._db = redis.Redis(host=host, port=port, password=PASSWORD)
else:
self._db = redis.Redis(host=host, port=port)
self.domain = domain

#获取数据库中所有键值
def keys(self):
return self._db.keys(self.key('*'))

#获取某个键
def key(self,shecme):
return '{domain}:{scheme}'.format(domain=self.domain,scheme=shecme)

#将proxy传入scheme中
def add(self,shecme,proxy):
return self._db.zadd(self.key(shecme),proxy,DEFAULT_SCORE)

#返回固定键值的所有值
def all(self,scheme):
return self._db.zrange(self.key(scheme),0,-1)

#更新scheme键中的proxy参数,将它放在最后一个
def up(self,scheme,proxy):
score=self._db.zscore(self.key(scheme),proxy)
return self._db.zincrby(self.key(scheme),proxy,MAX_SCORE-score)

#更新scheme键中的proxy参数,将他往前移一个
def down(self,scheme,proxy):
self._db.zincrby(self.key(scheme), proxy, -1)
#如果此代理的分数值小于最小分数值,则将此删除
if self._db.zscore(self.key(scheme), proxy) <= MIN_SCORE:
self._db.zrem(self.key(scheme), proxy)

#设置proxy为使用状态,将数值设为50
def set_use(self,proxy):
self._db.zincrby(self.key('http'), proxy, -50)

#获取一个可用IP
def get_proxy(self):
proxy=''
proxy_db=self._db.zrange(self.key('http'),-2,-1)
for out in proxy_db:
proxy=out.decode('utf-8').strip()
self.set_use(proxy)
return proxy

#获取特定数量的可用IP,做测试用
def use_test(self):
return self._db.zrange(self.key('http'), -8, -1)

if __name__=='__main__':
test=RedisClient()
out=test.use()
print(type(out))
print(out)


在这里因为要构建大量请求,所以我使用了grequests类来异步构建请求,grequests的具体用法请百度

代理池数据库的存储与操作:RedisClient类

因为我们的代理IP都是要放在redis数据库里面的,无论是存储,使用还是删除,我们最终的操作都是要落实到数据库里面的,所以我定义了一个RedisClient类来操作数据库

import redis
import random
from Myself.config import *

class RedisClient(object):

#初始化驱动
def __init__(self, host=HOST, port=PORT, domain=DOMAIN):
if PASSWORD:
self._db = redis.Redis(host=host, port=port, password=PASSWORD)
else:
self._db = redis.Redis(host=host, port=port)
self.domain = domain

#获取数据库中所有键值
def keys(self):
return self._db.keys(self.key('*'))

#获取某个键
def key(self,shecme):
return '{domain}:{scheme}'.format(domain=self.domain,scheme=shecme)

#将proxy传入scheme中
def add(self,shecme,proxy):
return self._db.zadd(self.key(shecme),proxy,DEFAULT_SCORE)

#返回固定键值的所有值
def all(self,scheme):
return self._db.zrange(self.key(scheme),0,-1)

#
be90
更新scheme键中的proxy参数,将它放在最后一个
def up(self,scheme,proxy):
score=self._db.zscore(self.key(scheme),proxy)
return self._db.zincrby(self.key(scheme),proxy,MAX_SCORE-score)

#更新scheme键中的proxy参数,将他往前移一个
def down(self,scheme,proxy):
self._db.zincrby(self.key(scheme), proxy, -1)
#如果此代理的分数值小于最小分数值,则将此删除
if self._db.zscore(self.key(scheme), proxy) <= MIN_SCORE:
self._db.zrem(self.key(scheme), proxy)

#设置proxy为使用状态,将数值设为50
def set_use(self,proxy):
self._db.zincrby(self.key('http'), proxy, -50)

#从数据库中获取一个可用IP
def get_proxy(self):
proxy=''
proxy_db=self._db.zrange(self.key('http'),-2,-1)
for out in proxy_db:
proxy=out.decode('utf-8').strip()
self.set_use(proxy)
return proxy

#获取特定数量的可用IP,做测试用
def use_test(self):
return self._db.zrange(self.key('http'), -8, -1)

if __name__=='__main__':
test=RedisClient()
out=test.use()
print(type(out))
print(out)


配置文件:

在程序运行的时候,会有一些具体的参数,例如redis的用户名和密码、获取代理和检测代理的周期,测试要用的网址等

#域名
DOMAIN='proxy'

HOST='localhost'

PORT=6379

PASSWORD=''

#测试要用的网址
TEST_URL = 'http://www.baidu.com'

#设置获取代理的周期
GET_PROXY_CYCLE=500

#设置检测代理的周期
TEST_PROXY_CYCLE=100

#获取代理的开关
GET_PROXY_PROCESS = True

#检测代理的开关
VALID_TEST_PROCESS = True

#设置代理的最大分数值
MAX_SCORE = 100

#设置代理的最小分数值
MIN_SCORE = 2

#设置代理的默认分数值
DEFAULT_SCORE = 10

#设置使用中的代理的分数值
USE_STATUS_SCORE=50


以上就是代理池的具体架构和实现思路
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  python redis 爬虫