您的位置:首页 > 编程语言 > Python开发

python自制免费代理IP服务

2017-04-09 22:38 507 查看
python去各大免费代理ip网站抓取代理ip并做校验入库,然后对库里的ip不停做校验,然后用flask做成api接口

传送门:https://github.com/zjl1110/ProxyIPGet

目录结构:

ProxyIPGet

|----app

    |----flaskrun.py(flask程序)

    |----static(没用上)

    |----templates(没用上)

|----checkout_script.py(用来不停校验库里的ip是否有效)

|----common(公用模块)

    |----__init__.py

    |----email_manager.py(发邮件模块,没用上)

    |----html_manager.py(html模块,没用上)

    |----ip_db_manager.py(存,取ip)

    |----log_manager.py(日志模块,没用上)

    |----redis_manager.py(redis模块)

    |----request_common.py(请求模块)

    |----request_manager(请求模块)

    |----setting.py(设置模块)

    |----url_manager.py(url模块,没用上)

|----run.py(抓取校验ip并入库)

|----runapp.py(三个程序的主入口程序)

email_manager.py:

# -*- coding: utf-8 -*-
#__author__="ZJL"

import smtplib
import email.mime.multipart
import email.mime.text
from common.setting import toEmail, emailName, emailPassword, smtp_connect

class EmailTloost:
def __init__(self, toemail, totilte, totext):
self.toemail = toemail
self.emailname = emailName
self.emailpassword = emailPassword
self.smtp_connect = smtp_connect

self.msg = email.mime.multipart.MIMEMultipart()
self.msg['from'] = self.emailname
self.msg['to'] = self.toemail
self.msg['subject'] = totilte
self.content = totext
self.txt = email.mime.text.MIMEText(self.content)
self.msg.attach(self.txt)

# smtp = smtplib

def sendEmail(self):
smtp = smtplib.SMTP()
smtp.connect(self.smtp_connect, '25')
smtp.login(self.emailname, self.emailpassword)
smtp.sendmail(self.emailname, self.toemail, str(self.msg))
smtp.quit()

def batchSendEmail(totilte, totext):
for toemail in toEmail:
e = EmailTloost(toemail, totilte, totext)
e.sendEmail()

# batchSendEmail("xxx","hahahah")


html_manager.py:

# -*- coding: utf-8 -*-
#__author__="ZJL"

from common.redis_manager import RedisManager as rm

# Html页面管理器
class HtmlManager(object):
def __init__(self, new_htmls="new_htmls", **key):
self.rm = rm()
self.new_htmls = new_htmls

# 向管理器中添加一个新的html

def add_new_html(self, html):
if html is None:
return
# 如果不在队列中就添加新html
if not self.rm.isExist(self.new_htmls, html):
self.rm.setSets(self.new_htmls, html)

# 向管理器中添加新的更多的html

def add_new_htmls(self, htmls):
if htmls is None or len(htmls) == 0:
return
for html in htmls:
self.add_new_html(html)

# 判断管理器是否有新的html

def has_new_html(self):
return self.rm.setsLen(self.new_htmls) != 0

# 从管理器中获取一个html

def get_new_html(self):
new_html = self.rm.getSetsOneDel(self.new_htmls)
return new_html


ip_db_manager.py
# -*- coding: utf-8 -*-
#__author__="ZJL"

import random

# 将有效IP存到reids
def Ip_DBSave(rm, db_name, ip_port, ip_time):
try:
ip_times = rm.getKeyAllAttribute(db_name)
if len(ip_times)<110:
rm.setKeyValue(db_name,ip_time,ip_port)
else:
ip_times.sort(reverse=True)
rm.delAttribute(db_name, ip_times[-1])
rm.setKeyValue(db_name, ip_time, ip_port)
except Exception as e:
return e,"Ip_DB"

# 随机获取IP
def Ip_DBGet(rm, db_name):
try:
ip_times = rm.getKeyAllAttribute(db_name)
ip_len = len(ip_times)
ip_prot = rm.getKeyValue(db_name,ip_times[random.randint(0,ip_len-1)])
return ip_prot
except Exception as e:
return e,"Ip_DBGet"

# 获取所有IP
def Ip_DBGetAll(rm, db_name):
ip_prots={}
try:
ip_times = rm.getKeyAllAttribute(db_name)
for ip_time in ip_times:
ip = rm.getKeyValue(db_name, ip_time)
ip_prots[ip] = ip_time
# ips = [rm.getKeyValue(db_name, ip_time) for ip_time in ip_times]
return ip_prots
except Exception as e:
return e,"Ip_DBGetAll"


log_manager.py
# -*- coding: utf-8 -*-
#__author__="ZJL"

# 日志管理器
# 错误分两个等级,只有尾号1会触发邮件提醒,日志文件过大也会触发邮件
# 使用尾号区分是为了用前面的数字可以记录抓取阶段表示
import logging, traceback, os
from common.email_manager import batchSendEmail
from common.setting import logfilename, errortitle

def objLogging(errorcode, errortext):
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
filename=logfilename,
filemode='a+')

console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)

if errorcode[-1] == "0":
text = errortext + "\n" + traceback.format_exc()
logging.debug(text)
elif errorcode[-1] == "1":
text = errortext + "\n" + traceback.format_exc()
logging.warning(text)
try:
batchSendEmail(errortitle, text)
except Exception as e:
# print(traceback.format_exc())
logging.warning(traceback.format_exc())
else:
text = errortext + "\n" + traceback.format_exc()
logging.warning(text)

filesize = os.path.getsize(logfilename)
if filesize >= 3000000:
try:
batchSendEmail("日志文件过大", "日志文件大小大于3M,请及时处理")
except Exception as e:
# print(traceback.format_exc())
logging.warning(traceback.format_exc())


redis_manager.py
# -*- coding: utf-8 -*-
#__author__="ZJL"

import redis
from common.setting import redis_db,redis_host,redis_port

# redis队列管理器
class RedisManager(object):
def __init__(self, host=redis_host, port=redis_port, db=redis_db):
self.pool = redis.ConnectionPool(host=host, port=port, db=db)
self.r = redis.StrictRedis(connection_pool=self.pool)

# 可以存储任意格式

def setData(self, keyname, data):
data = self.r.set(keyname, data)
return data

# 取数据

def getData(self, keyname, coding="utf-8"):
data = self.r.get(keyname)
data = data.decode(coding)
return data

# 取数据并删除

def getDataDel(self, keyname, coding="utf-8"):
data = self.r.get(keyname)
data = data.decode(coding)
# 删除
self.r.delete(keyname)
return data

# 只保存属性值,key对应多个属性

def setValue(self, keyname, data):
data = self.r.lpush(keyname, data)
return data

# 取出属性值,并删除

def getValue(self, keyname, coding="utf-8"):
data = self.r.brpop(keyname, 0)[1]
data = data.decode(coding)
return data

# 以键值对形式保存属性名和属性值,key对应多个属性

def setKeyValue(self, keyname, datakey, data):
state = self.r.hset(keyname, datakey, data)
if state == 0:
return True
else:
return False

# 取出属性值

def getKeyValue(self, keyname, datakey, coding="utf-8"):
data = self.r.hget(keyname, datakey)
data = data.decode(coding)
return data

# 取出属性值并删除

def getKeyValueDel(self, keyname, datakey, coding="utf-8"):
data = self.r.hget(keyname, datakey)
data = data.decode(coding)
# 删除
self.r.hdel(keyname, datakey)
return data

# 根据属性名删属性值

def delAttribute(self, keyname, datakey):
hdel = self.r.hdel(keyname, datakey)
if hdel == 1:
return True
else:
return False

# 获得key下面所有属性名

def getKeyAllAttribute(self, keyname):
hkeys = self.r.hkeys(keyname)
return hkeys

# 获得所有key的名称

def getKey(self):
keys = self.r.keys()
return keys

# 获得同一个key还有多少

def getLen(self, keyname):
llen = self.r.llen(keyname)
return llen

# 判断key是否存在

def getExists(self, keyname):
exists = self.r.exists(keyname)
return exists

# 获得key的数量

def getDbsize(self):
dbsize = self.r.dbsize()
return dbsize

# 删除key

def deleteKy(self, keyname):
delete = self.r.delete(keyname)
if delete == 1:
return True
else:
return False

# 删除当前数据库的所有数据

def flushDB(self):
flushdb = self.r.flushdb()
return flushdb

# ======集合==========

# 添加数据,因为是集合所以有去重功能,返回添加了多少
def setSets(self, keyname, *data):
return self.r.sadd(keyname, *data)

# 取出集合,返回列表

def getSetsList(self, keyname, coding="utf-8"):
datas = self.r.smembers(keyname)
datas = [d.decode(coding) for d in datas]
return datas

# 取出集合,返回列表,最后删除

def getSetsListDel(self, keyname, coding="utf-8"):
datas = self.r.smembers(keyname)
datas = [d.decode(coding) for d in datas]
[self.r.srem(keyname, d) for d in datas]
return datas

# 取出集合最后一个元素

def getSetsOne(self, keyname, coding="utf-8"):
data = self.r.smembers(keyname)
data = [d.decode(coding) for d in data]
if len(data) > 0:
return data.pop()
else:
return

# 取出集合最后一个元素并删除

def getSetsOneDel(self, keyname, coding="utf-8"):
datas = self.r.smembers(keyname)
datas = [d.decode(coding) for d in datas]
if len(datas) > 0:
data = datas.pop()
self.r.srem(keyname, data)
return data
else:
return

# 删除集合的元素,返回删除了多少

def setsDel(self, keyname, *data):
return self.r.srem(keyname, data)

# 判断元素是否存在

def isExist(self, keyname, data):
return self.r.sismember(keyname, data)

# 集合长度

def setsLen(self, keyname):
return self.r.scard(keyname)

# 多个集合的交集,返回列表

def setsIntersection(self, *keyname):
data = self.r.sinter(keyname)
data = [d.decode("utf-8") for d in data]
return data

# 多个集合的并集,返回列表

def setsAndSet(self, *keyname):
data = self.r.sunion(keyname)
data = [d.decode("utf-8") for d in data]
return data


request_common.py
# -*- coding: utf-8 -*-
#__author__="ZJL"

#定义一个重试修饰器,默认重试一次,sets参数是便于将错误url和信息存入redis(或其他)
def asyncRetry(num_retries=1,sets=None):
#用来接收函数
def wrapper(func):
#用来接收函数的参数,这里采用协程方式
async def wrapper(*args,**kwargs):
#为了方便看抛出什么错误定义一个错误变量
last_exception =None
#循环执行包装的函数
for _ in range(num_retries):
try:
#如果没有错误就返回包装的函数,这样跳出循环,这里需要挂起
return await func(*args, **kwargs)
except Exception as e:
# print(e)
#捕捉到错误不要return,不然就不会循环了,这里不能挂起
# print(args[0],kwargs)
#这里用于将出错的url存入redis
# sets(args[0])
last_exception = e
#如果要看抛出错误就可以抛出
# raise last_exception
return wrapper
return wrapper


request_manager.py
# -*- coding: utf-8 -*-
#__author__="ZJL"

import aiohttp

# 请求管理器
class RequestManager(object):
def __init__(self):
self.session = aiohttp.ClientSession()

def get(self, url, *, allow_redirects=True, **kwargs):
return self.session.get(url, allow_redirects=True, **kwargs)

def post(self, url, *, data=None, **kwargs):
return self.session.post(url, data=None, **kwargs)

setting.py:

# -*- coding: utf-8 -*-
#__author__="ZJL"

# 接收邮箱
toEmail = ["xxxxxxxx@qq.com"]

smtp_connect = "smtp.163.com"

# 发送邮箱
emailName = "xxxxxx@163.com"

# 邮箱密码
emailPassword = "xxxxx"

# redisIP
redis_host = "127.0.0.1"

# redis端口
redis_port = 6379

# redisDB
redis_db = 1

# 日志文件名
logfilename = "Logging.log"

# 邮件标题
errortitle = "程序错误报告"

User_Agent=["Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)",
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36"]


url_manager.py:

# -*- coding: utf-8 -*-
#__author__="ZJL"

from common.redis_manager import RedisManager as rm

# Url管理器

class UrlManager(object):
# new_urls是待爬URL队列名,old_urls是已爬URL队列名,error_urls是失败URL队列名
def __init__(self, new_urls="new_urls", old_urls="old_urls", error_urls="error_urls", **key):
# redis队列
self.rm = rm()
# 待爬url
self.new_urls = new_urls
# 已爬url
self.old_urls = old_urls
# 失败url
self.error_urls = error_urls

# 向管理器中添加一个新的url

def add_new_url(self, url):
if url is None:
return
# 如果不在待爬中也不再已爬中就添加新url
if not self.rm.isExist(self.new_urls, url) and not self.rm.isExist(self.old_urls, url):
self.rm.setSets(self.new_urls, url)

# 向管理器中添加一个失败的url

def add_error_url(self, url):
if url is None:
return
self.rm.setSets(self.error_urls, url)

# 向管理器中添加新的更多的url

def add_new_urls(self, urls):
if urls is None or len(urls) == 0:
return
for url in urls:
self.add_new_url(url)

# 判断管理器是否有新的待爬取的url

def has_new_url(self):
return self.rm.setsLen(self.new_urls) != 0

# 从管理器中获取一个新的待爬取的url

def get_new_url(self):
new_url = self.rm.getSetsOneDel(self.new_urls)
self.rm.setSets(self.old_urls, new_url)
return new_url

# 从管理器中获取所有的待爬取的url

def get_new_urls(self):
new_urls = self.rm.getSetsListDel(self.new_urls)
for new_url in new_urls:
self.rm.setSets(self.old_urls, new_url)
return new_urls


run.py:

# -*- coding: utf-8 -*-
#__author__="ZJL"

from common.redis_manager import RedisManager
from common.request_manager import RequestManager
from common.request_common import asyncRetry
from common.url_manager import UrlManager
from common.ip_db_manager import Ip_DBSave
from bs4 import BeautifulSoup as bs
import asyncio
import time
import random
import requests

# 公用头信息
headers = {
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36"
}

# 存IP
rdb = RedisManager(db="4")

# 这个没有用摆设
rm = UrlManager()

# 重试机制
@asyncRetry(4, rm.add_error_url)
async def getPage(url):
# asyncio.Semaphore(),限制同时运行协程数量
sem = asyncio.Semaphore(5)
with (await sem):
async with RequestManager().session as session:
async with session.get(url, headers=headers, timeout=360) as resp:
# 暂停一会儿,太不暗落落容易被封
time.sleep(random.random()*5)
# 断言,判断网站状态
assert resp.status == 200
# 判断不同url做不同的处理
if "xicidaili" in url:
body = await resp.text()
xici_grabPage(url,body)
elif "kuaidaili" in url:
body = await resp.text()
kuaidaili_grabPage(url,body)
elif "nianshao" in url:
body = await resp.text()
nianshao_grabPage(url,body)
elif "66ip" in url:
body = await resp.text()
ip66_grabPage(url,body)
elif "httpsdaili" in url:
body = await resp.text()
httpsdaili_grabPage(url,body)
elif "swei360" in url:
body = await resp.text()
swei360_grabPage(url,body)
elif "kxdaili" in url:
body = await resp.text()
kxdaili_grabPage(url,body)
else:
return await resp.text()
# 关闭请求
session.close()

# 各个网站的不同解析函数
def xici_grabPage(url,body):
try:
soup = bs(body, "lxml")
trs = soup.find(id="ip_list").find_all("tr")
for index, tr in enumerate(trs):
if index > 0:
for index, td in enumerate(tr.find_all("td")):
if index == 1:
ip = td.text
elif index == 2:
port = td.text
checkout_ip(ip + ":" + port,url)
except Exception as e:
return e, "xici_grabPage"

def kuaidaili_grabPage(url,body):
try:
soup = bs(body, "lxml")
trs = soup.find(id="list").find_all("tr")
for index, tr in enumerate(trs):
if index > 0:
for index, td in enumerate(tr.find_all("td")):
if index == 0:
ip = td.text
elif index == 1:
port = td.text
print(ip + ":" + port)
checkout_ip(ip + ":" + port,url)
except Exception as e:
return e, "kuaidaili_grabPage"

def nianshao_grabPage(url,body):
try:
soup = bs(body, "lxml")
trs = soup.find(class_="table").find_all("tr")
for index, tr in enumerate(trs):
if index > 0:
for index, td in enumerate(tr.find_all("td")):
if index == 0:
ip = td.text
elif index == 1:
port = td.text
print(ip + ":" + port)
checkout_ip(ip + ":" + port,url)
except Exception as e:
return e, "nianshao_grabPage"

def ip66_grabPage(url,body):
try:
soup = bs(body, "lxml")
trs = soup.find("table", width='100%').find_all("tr")
for index, tr in enumerate(trs):
if index > 0:
for index, td in enumerate(tr.find_all("td")):
if index == 0:
ip = td.text
elif index == 1:
port = td.text
print(ip + ":" + port)
checkout_ip(ip + ":" + port,url)
except Exception as e:
return e, "ip66_grabPage"

def httpsdaili_grabPage(url,body):
try:
soup = bs(body, "lxml")
trs = soup.find("table", class_="table table-bordered table-striped").find_all("tr")
for index, tr in enumerate(trs):
if index > 0:
for index, td in enumerate(tr.find_all("td")):
if index == 0:
ip = td.text
elif index == 1:
port = td.text
print(ip + ":" + port)
checkout_ip(ip + ":" + port,url)
except Exception as e:
return e, "httpsdaili_grabPage"

def swei360_grabPage(url,body):
try:
soup = bs(body, "lxml")
trs = soup.find("div", id="list").find_all("tr")
for index, tr in enumerate(trs):
if index > 0:
for index, td in enumerate(tr.find_all("td")):
if index == 0:
ip = td.text
elif index == 1:
port = td.text
print(ip + ":" + port)
checkout_ip(ip + ":" + port,url)
except Exception as e:
return e, "swei360_grabPage"

def kxdaili_grabPage(url,body):
try:
soup = bs(body, "lxml")
trs = soup.find("table", class_="ui table segment").find_all("tr")
for index, tr in enumerate(trs):
if index > 0:
for index, td in enumerate(tr.find_all("td")):
if index == 0:
ip = td.text
elif index == 1:
port = td.text
print(ip + ":" + port)
checkout_ip(ip + ":" + port,url)
except Exception as e:
return e, "kxdaili_grabPage"

# IP有效性检查,去访问百度
def  checkout_ip(ip_port,xurl=""):
s = requests.session()
try:
proxies={
"http":"http://"+ip_port,
}
url = "https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=0&rsv_idx=1&tn=baidu&wd=python&rsv_pq=b3fb9f5200036a4f&rsv_t=04cdhQxxUlftjer%2FovL4Xb6B2ySx%2F%2BMhjXIPfJV24Ezf7GRFVpuhiYmxzmw&rqlang=cn&rsv_enter=1&rsv_sug3=7&rsv_sug1=1&rsv_sug7=100&rsv_sug2=0&inputT=2391&rsv_sug4=3002&rsv_sug=2"
# url = "http://www.ip181.com/"
r = s.get(url, headers=headers, proxies=proxies,timeout=360)
time.sleep(random.random() * 2)
assert r.status_code == 200
except Exception as e:
return e,"checkout_ip"
else:
print(xurl+" "+ip_port+" OK")
ip_time = time.time()
db_name = "proxyIP"
Ip_DBSave(rdb, db_name, ip_port, ip_time)
finally:
s.close()

def main():
# 总页数
page_num = 5
# 起始页面
page_url_base = [
'http://www.xicidaili.com/nn/',
'http://www.kuaidaili.com/free/inha/',
'http://www.nianshao.me/?page=',
'http://www.66ip.cn/',
# 'http://www.goubanjia.com/free/anoy/%E9%AB%98%E5%8C%BF/',
'http://www.httpsdaili.com/?page=',
'http://www.swei360.com/free/?stype=1&page=',
'http://www.kxdaili.com/dailiip/1/'
]

# 所有URL的列表
page_urls = []
for url in page_url_base:
if "66ip" in url or "kxdaili" in url :
for num in range(1, page_num + 1):
new_url = url + str(num) +".html"
page_urls.append(new_url)
elif "goubanjia" in url :
for num in range(1, page_num + 1):
new_url = url + "index" + str(num) + ".shtml"
page_urls.append(new_url)
else:
for num in range(1,page_num+1):
new_url = url + str(num)
page_urls.append(new_url)
# asyncio.get_event_loop(),创建事件循环
loop = asyncio.get_event_loop()
# 协程任务
tasks = [getPage(host) for host in page_urls]
# 在事件循环中执行协程程序
loop.run_until_complete(asyncio.gather(*tasks))
# 关闭
loop.close()

if __name__ == '__main__':
# start = time.time()
while True:
main()
time.sleep(6000*2)
# print("Elapsed Time: %s" % (time.time() - start))

''' http://www.xicidaili.com/nn/4 http://www.kuaidaili.com/free/inha/8/ http://www.data5u.com/ http://www.66ip.cn/3.html http://www.nianshao.me/?page=2 http://www.goubanjia.com/free/anoy/%E9%AB%98%E5%8C%BF/index2.shtml http://www.httpsdaili.com/?page=3 http://www.swei360.com/free/?stype=1&page=2
'''


checkout_script.py:

# -*- coding: utf-8 -*-
#__author__="ZJL"

from common.ip_db_manager import Ip_DBGetAll
from common.redis_manager import RedisManager
import requests
import time
import random

headers = {
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36"
}

# 不停校验库内IP是否有效
def checkout_IP():
rdb = RedisManager(db="4")
db_name = "proxyIP"
ip_list = Ip_DBGetAll(rdb,db_name)
for ip,ip_time in ip_list.items():
web_checkout_ip(rdb, db_name, ip, ip_time)

# 去百度校验IP是否有效
def  web_checkout_ip(rm, db_name, ip_port, ip_time):
s = requests.session()
try:
proxies={
"http":"http://"+ip_port,
}
url = "https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=0&rsv_idx=1&tn=baidu&wd=python&rsv_pq=b3fb9f5200036a4f&rsv_t=04cdhQxxUlftjer%2FovL4Xb6B2ySx%2F%2BMhjXIPfJV24Ezf7GRFVpuhiYmxzmw&rqlang=cn&rsv_enter=1&rsv_sug3=7&rsv_sug1=1&rsv_sug7=100&rsv_sug2=0&inputT=2391&rsv_sug4=3002&rsv_sug=2"
# url = "http://www.ip181.com/"
r = s.get(url, headers=headers, proxies=proxies,timeout=360)
time.sleep(random.random() * 2)
assert r.status_code == 200
except Exception as e:
print(r.status_code)
print("DEL",rm.delAttribute(db_name,ip_time))
return e,"checkout_ip"
else:
print(r.status_code)
finally:
s.close()

def main():
checkout_IP()

if __name__ == '__main__':
while True:
main()


flaskrun.py:

# -*- coding: utf-8 -*-
#__author__="ZJL"

from flask import Flask
from common.ip_db_manager import Ip_DBGet
from common.redis_manager import RedisManager

rdb = RedisManager(db="4")
db_name = "proxyIP"

app = Flask(__name__)

@app.route('/getip')
def get_ipport():
ip_port = Ip_DBGet(rdb, db_name)
return ip_port

@app.errorhandler(403)
def page_not_found(error):
return "403"

@app.errorhandler(404)
def page_not_found(error):
return "404"

@app.errorhandler(410)
def page_not_found(error):
return "403"

@app.errorhandler(500)
def page_not_found(error):
return "500"

if __name__ == '__main__':
app.run(debug=True)


runapp.py:

# -*- coding: utf-8 -*-
#__author__="ZJL"

import os
import multiprocessing

# print(os.system("python app/flaskrun.py"))
# print(os.system("python run.py"))

def worker(str):
os.system(str)

if __name__ == "__main__":
p1 = multiprocessing.Process(target=worker, args=("python3 app/flaskrun.py",))
p2 = multiprocessing.Process(target=worker, args=("python3 run.py",))
p3 = multiprocessing.Process(target=worker, args=("python3 checkout_script.py",))

p1.start()
p2.start()
p3.start()


结果:





浏览器输入:
http://127.0.0.1:5000/getip
结果:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  python 代理ip