【2022 年】崔庆才 Python3 爬虫教程 – 高效代理池的维护
这是爬虫专栏第「31」篇原创
咱们在上一节中了解了各个恳求库设置署理的各个办法,可是怎么实时高效地获取到许多可用的署理是一个问题。
首要,在互联网上有许多揭露的免费署理。当然,咱们也能够购买付费的署理 IP,可是署理不论是免费的仍是付费的,都不能保证是可用的,由于此 IP 或许被其他人用来爬取相同的方针站点而被封禁,或许署理服务器忽然发生故障或网络繁忙。一旦咱们选用了一个不可用的署理,这势必会影响爬虫的工作功率。
所以,咱们需求提前做挑选,将不可用的署理剔除掉,保留可用署理。
那么,怎么完结呢?这就需求借助于一个叫作署理池的东西了。
接下来,本节就来介绍一下怎么建立一个高效易用的署理池。
1.准备工作
这儿署理池的存储需求借助于 Redis,因而需求额外装置它。总体来说,本节需求的环境如下:
-
需求装置并成功运转和衔接一个 Redis 数据库,Redis 运转在本地或许远端服务器都能够,只需能正常衔接就行,装置办法能够参考:setup.scrape.center/redis
-
装置好一些必要的库,包含 aiohttp、requests、redis-py、pyquery、Flask、loguru 等,装置命令如下:
pip3 install aiohttp requests redis pyquery flask loguru
做好了如上准备工作,咱们便能够开始完结或运转本节所讲的署理池了。
2.署理池的方针
咱们需求做到下面几个方针来完结易用高效的署理池。
署理池根本模块分为 4 部分:存储模块、获取模块、检测模块和接口模块,其功用如下:
- 存储模块:担任存储抓取下来的署理。首要要保证署理不重复,要标识署理的可用状况,还要动态实时处理每个署理,所以一种比较高效和方便的存储办法就是运用 Redis 的 Sorted Set,即有序调集。
- 获取模块:需求守时在各大署理网站抓取署理。署理既能够是免费揭露署理,也能够是付费署理,署理的形式都是 IP 加端口。此模块尽量从不同来历获取,尽量抓取高匿署理,抓取成功之后将可用署理保存到数据库中。
- 检测模块:需求守时检测数据库中的署理。这儿需求设置一个检测链接,最好是爬取哪个网站就检测哪个网站,这样更加有针对性。假如要做一个通用型的署理,能够设置百度等链接来检测。别的,咱们需求标识每一个署理的状况,如设置分数标识,100 分代表可用,分数越少代表越不可用。检测一次,假如署理可用,咱们能够将分数标识当即设置为 100 满分,也能够在原根底上加 1 分;假如署理不可用,能够将分数标识减 1 分,当分数减到一定阈值后,署理就直接从数据库移除。经过这样标识分数,咱们就能够辨别署理的可用状况,选用的时分会更有针对性。
- 接口模块:需求用 API 来供给对外服务的接口。其实咱们能够直接衔接数据库来取对应的数据,可是这样就需求知道数据库的衔接信息,并且要装备衔接,而比较安全和方便的办法就是供给一个 Web API 接口,咱们经过拜访接口即可拿到可用署理。别的,由于可用署理或许有多个,所以咱们能够设置一个随机回来某个可用署理的接口,这样就能保证每个可用署理都能够取到,完结负载均衡。
以上内容是规划署理的一些根本思路。接下来,咱们规划整体的架构,然后用代码完结署理池。
3. 署理池的架构
依据上文的描绘,署理池的架构如图所示。
图中所示的署理池分为 4 个模块:存储模块、获取模块、检测模块和接口模块:
- 存储模块运用 Redis 的有序调集,用来做署理的去重和状况标识,一起它也是中心模块和根底模块,用于将其他模块串联起来。
- 获取模块守时从署理网站获取署理,将获取的署理传递给存储模块,并保存到数据库。
- 检测模块守时经过存储模块获取一切署理,并对署理进行检测,依据不同的检测成果对署理设置不同的标识。
- 接口模块经过 Web API 供给服务接口,接口经过衔接数据库并经过 Web 形式回来可用的署理。
4.署理池的完结
接下来,咱们别离用代码来完结一下这 4 个模块。
注意:完好的署理池代码量较大,因而本节的代码咱们不再一步步跟着编写,最终去了解源码即可,源码地址为:github.com/Python3WebS…
存储模块
这儿咱们运用 Redis 的有序调集,调集中的每一个元素都是不重复的。关于署理池来说,调集中的元素就变成了一个个署理,也就是 IP 加端口的形式,如 60.207.237.111:8888
。别的,有序调集的每一个元素都有一个分数字段,分数是能够重复的,既能够是浮点数类型,也能够是整数类型。该调集会依据每一个元素的分数对调集进行排序,数值小的排在前面,数值大的排在后面,这样就能够完结调集元素的排序了。
关于署理池来说,这个分数能够作为判别一个署理是否可用的标志:100 为最高分,代表最可用;0 为最低分,代表最不可用。假如要获取可用署理,能够从署理池中随机获取分数最高的署理。注意这儿是随机,这样能够保证每个可用署理都会被调用到。
分数是咱们判别署理安稳性的重要标准。设置分数的规矩如下所示。
- 分数 100 为可用,检测器会守时循环检测每个署理的可用状况。一旦检测到有可用的署理,就当即置为 100;假如检测到不可用,就将分数减 1,分数减至 0 后署理移除。
- 新获取的署理的分数为 10,假如测验可行,分数当即置为 100,不可行则将分数减 1,分数减至 0 后署理移除。
这仅仅一种解决方案,当然或许还有更合理的方案。之所以设置此方案,有如下几个原因。
- 在检测到署理可用时,分数当即置为 100,这样能够保证一切可用署理有更大的时机被获取到。你或许会问,为什么不将分数加 1 而是直接将其设为最高值 100 呢?想象一下,有的署理是从各大免费揭露署理网站获取的,常常一个署理并没有那么安稳,平均 5 次恳求或许有 2 次成功,3 次失利。假如依照这种办法来设置分数,那么这个署理简直不或许到达一个高的分数,也就是说即便它有时是可用的,可是挑选的分数最高,那这样的署理简直不或许被取到。假如想追求署理安稳性,能够用上述办法,这种办法可保证分数最高的署理一定是最安稳可用的。所以,这儿咱们采取 “可用即设置 100” 的办法,保证只需可用的署理都能够被获取到。
- 在检测到署理不可用时,分数减 1,分数减至 0 后,署理移除。这样一个有用署理假如被移除,需求接二连三失利 100 次。也就是说,当一个可用署理测验了 100 次都失利了,就一直减分直到移除,一旦成功,就从头置回 100。测验时机越多,这个署理解救回来的时机越多,这样就不简单将曾经的一个可用署理丢掉,由于署理不可用的原因很或许是网络繁忙或许其他人用此署理恳求过分频繁,所以这儿将分数设为 100。
- 将新获取的署理的分数设置为 10,假如它不可用,分数就减 1,直到减到 0 就移除;假如署理可用,分数就置为 100。由于许多署理是从免费网站获取的,所以新获取的署理无效的份额十分高,或许可用的署理不足 10%。这儿咱们将分数设置为 10,检测的时机没有可用署理的 100 次那么多,这也能够恰当减少开支。
上述署理分数的设置思路纷歧定是最优思路,但据个人实测,它的实用性仍是比较强的。
这儿首要给出存储模块的完结代码,见 github.com/Python3WebS…
在代码中,咱们界说了一个类来操作数据库的有序调集,界说了一些办法来完结分数的设置、署理的获取等。其核心完结代码如下所示:
import redis
from proxypool.exceptions import PoolEmptyException
from proxypool.schemas.proxy import Proxy
from proxypool.setting import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MIN, \
PROXY_SCORE_INIT
from random import choice
from typing import List
from loguru import logger
from proxypool.utils.proxy import is_valid_proxy, convert_proxy_or_proxies
REDIS_CLIENT_VERSION = redis.__version__
IS_REDIS_VERSION_2 = REDIS_CLIENT_VERSION.startswith('2.')
class RedisClient(object):
"""
redis connection client of proxypool
"""
def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, **kwargs):
"""
init redis client
:param host: redis host
:param port: redis port
:param password: redis password
"""
self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True, **kwargs)
def add(self, proxy: Proxy, score=PROXY_SCORE_INIT) -> int:
"""
add proxy and set it to init score
:param proxy: proxy, ip:port, like 8.8.8.8:88
:param score: int score
:return: result
"""
if not is_valid_proxy(f'{proxy.host}:{proxy.port}'):
logger.info(f'invalid proxy {proxy}, throw it')
return
if not self.exists(proxy):
if IS_REDIS_VERSION_2:
return self.db.zadd(REDIS_KEY, score, proxy.string())
return self.db.zadd(REDIS_KEY, {proxy.string(): score})
def random(self) -> Proxy:
"""
get random proxy
firstly try to get proxy with max score
if not exists, try to get proxy by rank
if not exists, raise error
:return: proxy, like 8.8.8.8:8
"""
# try to get proxy with max score
proxies = self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MAX)
if len(proxies):
return convert_proxy_or_proxies(choice(proxies))
# else get proxy by rank
proxies = self.db.zrevrange(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX)
if len(proxies):
return convert_proxy_or_proxies(choice(proxies))
# else raise error
raise PoolEmptyException
def decrease(self, proxy: Proxy) -> int:
"""
decrease score of proxy, if small than PROXY_SCORE_MIN, delete it
:param proxy: proxy
:return: new score
"""
score = self.db.zscore(REDIS_KEY, proxy.string())
# current score is larger than PROXY_SCORE_MIN
if score and score > PROXY_SCORE_MIN:
logger.info(f'{proxy.string()} current score {score}, decrease 1')
if IS_REDIS_VERSION_2:
return self.db.zincrby(REDIS_KEY, proxy.string(), -1)
return self.db.zincrby(REDIS_KEY, -1, proxy.string())
# otherwise delete proxy
else:
logger.info(f'{proxy.string()} current score {score}, remove')
return self.db.zrem(REDIS_KEY, proxy.string())
def exists(self, proxy: Proxy) -> bool:
"""
if proxy exists
:param proxy: proxy
:return: if exists, bool
"""
return not self.db.zscore(REDIS_KEY, proxy.string()) is None
def max(self, proxy: Proxy) -> int:
"""
set proxy to max score
:param proxy: proxy
:return: new score
"""
logger.info(f'{proxy.string()} is valid, set to {PROXY_SCORE_MAX}')
if IS_REDIS_VERSION_2:
return self.db.zadd(REDIS_KEY, PROXY_SCORE_MAX, proxy.string())
return self.db.zadd(REDIS_KEY, {proxy.string(): PROXY_SCORE_MAX})
def count(self) -> int:
"""
get count of proxies
:return: count, int
"""
return self.db.zcard(REDIS_KEY)
def all(self) -> List[Proxy]:
"""
get all proxies
:return: list of proxies
"""
return convert_proxy_or_proxies(self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX))
def batch(self, start, end) -> List[Proxy]:
"""
get batch of proxies
:param start: start index
:param end: end index
:return: list of proxies
"""
return convert_proxy_or_proxies(self.db.zrevrange(REDIS_KEY, start, end - 1))
if __name__ == '__main__':
conn = RedisClient()
result = conn.random()
print(result)
首要,咱们界说了一些常量,如 PROXY_SCORE_MAX
、PROXY_SCORE_MIN
、PROXY_SCORE_INIT
别离代表最大分数、最小分数、初始分数。REDIS_HOST
、REDIS_PORT
、REDIS_PASSWORD
别离代表了 Redis 的衔接信息,即地址、端口和密码。REDIS_KEY
是有序调集的键名,咱们能够经过它来获取署理存储所运用的有序调集。
RedisClient
这个类能够用来操作 Redis 的有序调集,其间界说了一些办法来对调集中的元素进行处理,它的首要功用如下所示。
-
__init__
办法是初始化的办法,其参数是 Redis 的衔接信息,默认的衔接信息现已界说为常量。咱们在__init__
办法中初始化了StrictRedis
类,建立了 Redis 衔接。 -
add
办法用于向数据库添加署理并设置分数,默认的分数是PROXY_SCORE_INIT
,也就是 10,回来成果是添加的成果。 -
random
办法是随机获取署理的办法。首要获取 100 分的署理,然后随机挑选一个回来。假如不存在 100 分的署理,则此办法依照排名来获取,选取前 100 名,然后随机挑选一个回来,不然抛出异常。 -
decrease
办法是在署理检测无效的时分设置分数减 1 的办法,署理传入后,此办法将署理的分数减 1,假如分数到达最低值,那么署理就删去。 -
exists
办法用于判别署理是否存在调集中。 -
max
办法用于将署理的分数设置为PROXY_SCORE_MAX
,即 100,也就是署理有用时的设置。 -
count
办法用于回来当时调集的元素个数。 -
all
办法回来一切的署理列表,供检测运用。
界说好这些办法后,咱们能够在后续的模块中调用此类来衔接和操作数据库。假如要获取随机可用的署理,只需求调用 random
办法即可,得到的就是随机的可用署理。
获取模块
获取模块首要是为了从各大网站抓取署理并调用存储模块进行保存,代码完结见 github.com/Python3WebS…
获取模块的逻辑相对简单,比方咱们能够界说一些抓取署理的办法,示例如下:
from proxypool.crawlers.base import BaseCrawler
from proxypool.schemas.proxy import Proxy
import re
MAX_PAGE = 5
BASE_URL = 'http://www.ip3366.net/free/?stype=1&page={page}'
class IP3366Crawler(BaseCrawler):
"""
ip3366 crawler, http://www.ip3366.net/
"""
urls = [BASE_URL.format(page=i) for i in range(1, 8)]
def parse(self, html):
"""
parse html file to get proxies
:return:
"""
ip_address = re.compile('<tr>\s*<td>(.*?)</td>\s*<td>(.*?)</td>')
# \s * 匹配空格,起到换行作用
re_ip_address = ip_address.findall(html)
for address, port in re_ip_address:
proxy = Proxy(host=address.strip(), port=int(port.strip()))
yield proxy
这儿界说了一个署理类 Crawler
,用来抓取某一网站的署理,这儿抓取的是 IP3366 的揭露署理,经过 parse
办法来解析页面的源码并构造一个个 Proxy 方针回来即可。
别的,在其父类 BaseCrawler
里边界说了通用的页面抓取办法,它能够读取子类里边界说的 urls
全局变量并进行爬取,然后调用子类的 parse
办法来解析页面,代码完结如下:
from retrying import retry
import requests
from loguru import logger
class BaseCrawler(object):
urls = []
@retry(stop_max_attempt_number=3, retry_on_result=lambda x: x is None)
def fetch(self, url, **kwargs):
try:
response = requests.get(url, **kwargs)
if response.status_code == 200:
return response.text
except requests.ConnectionError:
return
@logger.catch
def crawl(self):
"""
crawl main method
"""
for url in self.urls:
logger.info(f'fetching {url}')
html = self.fetch(url)
for proxy in self.parse(html):
logger.info(f'fetched proxy {proxy.string()} from {url}')
yield proxy
假如要扩展一个署理的 Crawler
,只需求集成 BaseCrawler
并完结 parse
办法即可,扩展性较好。
因而,这一个个的 Crawler
就能够针对各个不同的署理网站进行署理的抓取。最终,有一个一致的办法将 Crawler
汇总起来,遍历调用即可。
怎么汇总呢?这儿咱们能够检测代码只需界说有 BaseCrawler
的子类就算一个有用的署理 Crawler
,能够直接经过遍历 Python 文件包的办法来获取,代码完结如下:
import pkgutil
from .base import BaseCrawler
import inspect
# load classes subclass of BaseCrawler
classes = []
for loader, name, is_pkg in pkgutil.walk_packages(__path__):
module = loader.find_module(name).load_module(name)
for name, value in inspect.getmembers(module):
globals()[name] = value
if inspect.isclass(value) and issubclass(value, BaseCrawler) and value is not BaseCrawler:
classes.append(value)
__all__ = __ALL__ = classes
这儿咱们调用了 walk_packages
办法,遍历了整个 crawlers 模块下的类,并判别它是 BaseCrawler
的子类,那就将其添加到成果中并回来。
最终,只需将 classes
遍历并顺次实例化,调用其 crawl
办法即可完结署理的爬取和提取,代码完结见 github.com/Python3WebS…
检测模块
咱们现已成功将各个网站的署理获取下来了,现在需求一个检测模块来对一切署理进行多轮检测。署理检测可用,分数就设置为 100,署理不可用,分数就减 1,这样能够实时改变每个署理的可用状况。假如要获取有用署理,只需求获取分数高的署理即可。
由于署理的数量十分多,为了进步署理的检测功率,这儿运用异步恳求库 aiohttp 来检测。
requests 作为一个同步恳求库,咱们在宣布一个恳求之后,程序需求等候网页加载完结之后才能持续履行。也就是这个过程会阻塞等候呼应,假如服务器呼应十分慢,比方一个恳求等候十几秒,那么咱们运用 requests 完结一个恳求就会需求十几秒的时刻,程序也不会持续往下履行,而在这十几秒的时刻里,程序其实完全能够去做其他的事情,比方调度其他的恳求或许进行网页解析等。
关于呼应速度比较快的网站来说,requests 同步恳求和 aiohttp 异步恳求的作用距离没那么大。可关于检测署理来说,检测一个署理一般需求十多秒甚至几十秒的时刻,这时分运用 aiohttp 异步恳求库的优势就大大体现出来了,功率或许会进步几十倍不止。
所以,咱们的署理检测运用异步恳求库 aiohttp,完结示例如下所示:
import asyncio
import aiohttp
from loguru import logger
from proxypool.schemas import Proxy
from proxypool.storages.redis import RedisClient
from proxypool.setting import TEST_TIMEOUT, TEST_BATCH, TEST_URL, TEST_VALID_STATUS
from aiohttp import ClientProxyConnectionError, ServerDisconnectedError, ClientOSError, ClientHttpProxyError
from asyncio import TimeoutError
EXCEPTIONS = (
ClientProxyConnectionError,
ConnectionRefusedError,
TimeoutError,
ServerDisconnectedError,
ClientOSError,
ClientHttpProxyError
)
class Tester(object):
"""
tester for testing proxies in queue
"""
def __init__(self):
"""
init redis
"""
self.redis = RedisClient()
self.loop = asyncio.get_event_loop()
async def test(self, proxy: Proxy):
"""
test single proxy
:param proxy: Proxy object
:return:
"""
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
try:
logger.debug(f'testing {proxy.string()}')
async with session.get(TEST_URL, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT,
allow_redirects=False) as response:
if response.status in TEST_VALID_STATUS:
self.redis.max(proxy)
logger.debug(f'proxy {proxy.string()} is valid, set max score')
else:
self.redis.decrease(proxy)
logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
except EXCEPTIONS:
self.redis.decrease(proxy)
logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
@logger.catch
def run(self):
"""
test main method
:return:
"""
# event loop of aiohttp
logger.info('stating tester...')
count = self.redis.count()
logger.debug(f'{count} proxies to test')
for i in range(0, count, TEST_BATCH):
# start end end offset
start, end = i, min(i + TEST_BATCH, count)
logger.debug(f'testing proxies from {start} to {end} indices')
proxies = self.redis.batch(start, end)
tasks = [self.test(proxy) for proxy in proxies]
# run tasks using event loop
self.loop.run_until_complete(asyncio.wait(tasks))
if __name__ == '__main__':
tester = Tester()
tester.run()
这儿界说了一个类 Tester
,__init__
办法中建立了一个 RedisClient
方针,供该方针中其他办法运用。接下来,界说了一个 test
办法,这个办法用来检测单个署理的可用状况,其参数就是被检测的署理。注意,test
办法前面加了 async
关键词,这代表这个办法是异步的。办法内部首要创建了 aiohttp 的 ClientSession
方针,能够直接调用该方针的 get
办法来拜访页面。
测验链接在这儿界说为常量 TEST_URL
。假如针对某个网站有抓取需求,建议将 TEST_URL
设置为方针网站的地址,由于在抓取过程中,署理自身或许是可用的,可是该署理的 IP 现已被方针网站封掉了。例如,某些署理能够正常拜访百度等页面,可是对知乎来说或许就被封了,所以咱们能够将 TEST_URL
设置为知乎的某个页面的链接。当恳求失利、署理被封时,分数自然会减下来,失效的署理就不会被取到了。
假如想做一个通用的署理池,则不需求专门设置 TEST_URL
,既能够将其设置为一个不会封 IP 的网站,也能够设置为百度这类呼应安稳的网站。
咱们还界说了 TEST_VALID_STATUS
变量,这个变量是一个列表形式,包含了正常的状况码,如能够界说成 [200]
。当然,某些方针网站或许会出现其他的状况码,能够自行装备。
程序在获取呼应后需求判别呼应的状况,假如状况码在 TEST_VALID_STATUS
列表里,则代表署理可用,能够调用 RedisClient
的 max
办法将署理分数设为 100,不然调用 decrease
办法将署理分数减 1,假如出现异常,也相同将署理分数减 1。
别的,咱们设置了批量测验的最大值 TEST_BATCH
,也就是一批测验最多 TEST_BATCH
个,这能够防止署理池过大时一次性测验悉数署理导致内存开支过大的问题。当然,也能够用信号量来完结并发操控。
随后,在 run
办法里边获取了一切的署理列表,运用 aiohttp 分配任务,发动运转。这样在不断的运转过程中,署理池中无效署理的分数会一直被减 1,直至被清除,有用的署理则会一直坚持 100 分,供随时取用。
这样测验模块的逻辑就完结了。
接口模块
经过上述 3 个模块,咱们现已能够做到署理的获取、检测和更新,数据库就会以有序调集的形式存储各个署理及其对应的分数,分数 100 代表可用,分数越小代表越不可用。
可是咱们怎样方便地获取可用署理呢?能够用 RedisClient
类直接衔接 Redis,然后调用 random
办法。这样做没问题,功率很高,可是会有几个弊端。
- 假如其他人运用这个署理池,他需求知道 Redis 衔接的用户名和密码信息,这样很不安全。
- 假如署理池需求布置在长途服务器上运转,而长途服务器的 Redis 只允许本地衔接,那么咱们就不能长途直连 Redis 来获取署理。
- 假如爬虫所在的主机没有衔接 Redis 模块,或许爬虫不是由 Python 语言编写的,那么咱们就无法运用
RedisClient
来获取署理。 - 假如
RedisClient
类或许数据库结构有更新,那么爬虫端有必要同步这些更新,这样十分麻烦。
综上考虑,为了使署理池能够作为一个独立服务运转,咱们最好增加一个接口模块,并以 Web API 的形式暴露可用署理。
这样一来,获取署理只需求恳求接口即可,以上的几个缺点也能够防止。
咱们运用一个比较轻量级的库 Flask 来完结这个接口模块,完结示例如下所示:
from flask import Flask, g
from proxypool.storages.redis import RedisClient
from proxypool.setting import API_HOST, API_PORT, API_THREADED
__all__ = ['app']
app = Flask(__name__)
def get_conn():
"""
get redis client object
:return:
"""
if not hasattr(g, 'redis'):
g.redis = RedisClient()
return g.redis
@app.route('/')
def index():
"""
get home page, you can define your own templates
:return:
"""
return '<h2>Welcome to Proxy Pool System</h2>'
@app.route('/random')
def get_proxy():
"""
get a random proxy
:return: get a random proxy
"""
conn = get_conn()
return conn.random().string()
@app.route('/count')
def get_count():
"""
get the count of proxies
:return: count, int
"""
conn = get_conn()
return str(conn.count())
if __name__ == '__main__':
app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)
这儿咱们声明晰一个 Flask 方针,界说了 3 个接口,别离是主页、随机署理页和获取数量页。
运转之后,Flask 会发动一个 Web 服务,咱们只需求拜访对应的接口即可获取到可用署理。
调度模块
调度模块就是调用上面所界说的 3 个模块,将这 3 个模块经过多进程的形式运转起来,示例如下所示:
import time
import multiprocessing
from proxypool.processors.server import app
from proxypool.processors.getter import Getter
from proxypool.processors.tester import Tester
from proxypool.setting import CYCLE_GETTER, CYCLE_TESTER, API_HOST, API_THREADED, API_PORT, ENABLE_SERVER, \
ENABLE_GETTER, ENABLE_TESTER, IS_WINDOWS
from loguru import logger
if IS_WINDOWS:
multiprocessing.freeze_support()
tester_process, getter_process, server_process = None, None, None
class Scheduler():
"""
scheduler
"""
def run_tester(self, cycle=CYCLE_TESTER):
"""
run tester
"""
if not ENABLE_TESTER:
logger.info('tester not enabled, exit')
return
tester = Tester()
loop = 0
while True:
logger.debug(f'tester loop {loop} start...')
tester.run()
loop += 1
time.sleep(cycle)
def run_getter(self, cycle=CYCLE_GETTER):
"""
run getter
"""
if not ENABLE_GETTER:
logger.info('getter not enabled, exit')
return
getter = Getter()
loop = 0
while True:
logger.debug(f'getter loop {loop} start...')
getter.run()
loop += 1
time.sleep(cycle)
def run_server(self):
"""
run server for api
"""
if not ENABLE_SERVER:
logger.info('server not enabled, exit')
return
app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)
def run(self):
global tester_process, getter_process, server_process
try:
logger.info('starting proxypool...')
if ENABLE_TESTER:
tester_process = multiprocessing.Process(target=self.run_tester)
logger.info(f'starting tester, pid {tester_process.pid}...')
tester_process.start()
if ENABLE_GETTER:
getter_process = multiprocessing.Process(target=self.run_getter)
logger.info(f'starting getter, pid{getter_process.pid}...')
getter_process.start()
if ENABLE_SERVER:
server_process = multiprocessing.Process(target=self.run_server)
logger.info(f'starting server, pid{server_process.pid}...')
server_process.start()
tester_process.join()
getter_process.join()
server_process.join()
except KeyboardInterrupt:
logger.info('received keyboard interrupt signal')
tester_process.terminate()
getter_process.terminate()
server_process.terminate()
finally:
# must call join method before calling is_alive
tester_process.join()
getter_process.join()
server_process.join()
logger.info(f'tester is {"alive" if tester_process.is_alive() else "dead"}')
logger.info(f'getter is {"alive" if getter_process.is_alive() else "dead"}')
logger.info(f'server is {"alive" if server_process.is_alive() else "dead"}')
logger.info('proxy terminated')
if __name__ == '__main__':
scheduler = Scheduler()
scheduler.run()
3 个常量 ENABLE_TESTER
、ENABLE_GETTER
和 ENABLE_SERVER
都是布尔类型,表明测验模块、获取模块和接口模块的开关,假如都为 True
,则代表模块敞开。
发动入口是 run
办法,这个办法别离判别 3 个模块的开关。假如开关敞开,发动时程序就新建一个 Process 进程,设置好发动方针,然后调用 start
办法运转,这样 3 个进程就能够并行履行,互不干扰。
3 个调度办法的结构也十分明晰。比方,run_tester
办法用来调度测验模块。首要声明一个 Tester 方针,然后进入死循环不断循环调用其 run
办法,履行完一轮之后就休眠一段时刻,休眠结束之后从头再履行。这儿休眠时刻也界说为一个常量,如 20 秒,即每隔 20 秒进行一次署理检测。
最终,只需求调用 Scheduler
的 run
办法即可发动整个署理池。
以上内容就是整个署理池的架构和相应完结逻辑。
5.运转
接下来,咱们将代码整合一下,将署理运转起来,运转之后的输出成果如下所示:
2020-04-13 02:52:06.510 | INFO | proxypool.storages.redis:decrease:73 - 60.186.146.193:9000 current score 10.0, decrease 1
2020-04-13 02:52:06.517 | DEBUG | proxypool.processors.tester:test:52 - proxy 60.186.146.193:9000 is invalid, decrease score
2020-04-13 02:52:06.524 | INFO | proxypool.storages.redis:decrease:73 - 60.186.151.147:9000 current score 10.0, decrease 1
2020-04-13 02:52:06.532 | DEBUG | proxypool.processors.tester:test:52 - proxy 60.186.151.147:9000 is invalid, decrease score
2020-04-13 02:52:07.159 | INFO | proxypool.storages.redis:max:96 - 60.191.11.246:3128 is valid, set to 100
2020-04-13 02:52:07.167 | DEBUG | proxypool.processors.tester:test:46 - proxy 60.191.11.246:3128 is valid, set max score
2020-04-13 02:52:17.271 | INFO | proxypool.storages.redis:decrease:73 - 59.62.7.130:9000 current score 10.0, decrease 1
2020-04-13 02:52:17.280 | DEBUG | proxypool.processors.tester:test:52 - proxy 59.62.7.130:9000 is invalid, decrease score
2020-04-13 02:52:17.288 | INFO | proxypool.storages.redis:decrease:73 - 60.167.103.74:1133 current score 10.0, decrease 1
2020-04-13 02:52:17.295 | DEBUG | proxypool.processors.tester:test:52 - proxy 60.167.103.74:1133 is invalid, decrease score
2020-04-13 02:52:17.302 | INFO | proxypool.storages.redis:decrease:73 - 60.162.71.113:9000 current score 10.0, decrease 1
2020-04-13 02:52:17.309 | DEBUG | proxypool.processors.tester:test:52 - proxy 60.162.71.113:9000 is invalid, decrease score
以上是署理池的操控台输出,能够看到这儿将可用署理设置为 100,不可用署理分数减 1。
接下来,咱们再打开浏览器,当时装备运转在 5555 端口,所以打开 http://127.0.0.1:5555 即可看到其主页,如图所示。
再拜访 http://127.0.0.1:5555/random,即可获取随机可用署理,如图 9-3 所示。
只需求拜访此接口,即可获取一个随机可用署理,这十分方便。
获取署理的代码如下所示:
import requests
PROXY_POOL_URL = 'http://localhost:5555/random'
def get_proxy():
try:
response = requests.get(PROXY_POOL_URL)
if response.status_code == 200:
return response.text
except ConnectionError:
return None
这样便能够获取到一个随机署理了。它是字符串类型,此署理能够依照上一节所示的办法设置,如 requests 的运用办法如下所示:
import requests
proxy = get_proxy()
proxies = {
'http': 'http://' + proxy,
'https': 'https://' + proxy,
}
try:
response = requests.get('http://httpbin.org/get', proxies=proxies)
print(response.text)
except requests.exceptions.ConnectionError as e:
print('Error', e.args)
有了署理池之后,再取出署理即可有用防止 IP 被封禁的状况。
6.总结
本节咱们学习了一个署理池的规划思路和完结方案,有了这个署理池,咱们就能够实时获取一些可用的署理了。相对之前的实战事例来说,整个署理池的代码量和逻辑复杂了比较多,建议能够好好理解和消化一下。
本节的代码地址为 github.com/Python3WebS…,代码库中还供给了基于 Docker 和 Kubernetes 的运转和布置操作,能够协助咱们更加方便地运转署理池,一起本书后文也会介绍署理池的布置办法。
十分感谢你的阅览,更多精彩内容,请重视我的大众号「进击的 Coder」和「崔庆才丨静觅」。