90 lines
2.7 KiB
Python
90 lines
2.7 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
# @Author : relakkes@gmail.com
|
||
|
# @Time : 2023/12/2 13:45
|
||
|
# @Desc : ip代理池实现
|
||
|
import random
|
||
|
from typing import List
|
||
|
|
||
|
import httpx
|
||
|
from tenacity import retry, stop_after_attempt, wait_fixed
|
||
|
|
||
|
from tools import utils
|
||
|
|
||
|
from .proxy_ip_provider import IpInfoModel, IpProxy
|
||
|
|
||
|
|
||
|
class ProxyIpPool:
|
||
|
def __init__(self, ip_pool_count: int, enable_validate_ip: bool) -> None:
|
||
|
self.valid_ip_url = "https://httpbin.org/ip" # 验证 IP 是否有效的地址
|
||
|
self.ip_pool_count = ip_pool_count
|
||
|
self.enable_validate_ip = enable_validate_ip
|
||
|
self.proxy_list: List[IpInfoModel] = []
|
||
|
|
||
|
async def load_proxies(self) -> None:
|
||
|
"""
|
||
|
从 HTTP 代理商获取 IP 列表
|
||
|
:return:
|
||
|
"""
|
||
|
self.proxy_list = await IpProxy.get_proxies(self.ip_pool_count)
|
||
|
|
||
|
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
|
||
|
async def is_valid_proxy(self, proxy: IpInfoModel) -> bool:
|
||
|
"""
|
||
|
验证代理IP是否有效
|
||
|
:param proxy:
|
||
|
:return:
|
||
|
"""
|
||
|
utils.logger.info(f"[ProxyIpPool.is_valid_proxy] testing {proxy.ip} is it valid ")
|
||
|
try:
|
||
|
httpx_proxy = f"{proxy.protocol}{proxy.ip}:{proxy.port}"
|
||
|
proxy_auth = httpx.BasicAuth(proxy.user, proxy.password)
|
||
|
async with httpx.AsyncClient(proxies={proxy.protocol: httpx_proxy}, auth=proxy_auth) as client:
|
||
|
response = await client.get(self.valid_ip_url)
|
||
|
if response.status_code == 200:
|
||
|
return True
|
||
|
else:
|
||
|
return False
|
||
|
except Exception as e:
|
||
|
utils.logger.info(f"[ProxyIpPool.is_valid_proxy] testing {proxy.ip} err: {e}")
|
||
|
raise e
|
||
|
|
||
|
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
|
||
|
async def get_proxy(self) -> IpInfoModel:
|
||
|
"""
|
||
|
从代理池中随机提取一个代理IP
|
||
|
:return:
|
||
|
"""
|
||
|
if len(self.proxy_list) == 0:
|
||
|
await self.reload_proxies()
|
||
|
|
||
|
proxy = random.choice(self.proxy_list)
|
||
|
if self.enable_validate_ip:
|
||
|
if not await self.is_valid_proxy(proxy):
|
||
|
raise Exception("[ProxyIpPool.get_proxy] current ip invalid and again get it")
|
||
|
self.proxy_list.remove(proxy)
|
||
|
return proxy
|
||
|
|
||
|
async def reload_proxies(self):
|
||
|
"""
|
||
|
# 重新加载代理池
|
||
|
:return:
|
||
|
"""
|
||
|
self.proxy_list = []
|
||
|
await self.load_proxies()
|
||
|
|
||
|
|
||
|
async def create_ip_pool(ip_pool_count: int, enable_validate_ip) -> ProxyIpPool:
|
||
|
"""
|
||
|
创建 IP 代理池
|
||
|
:param ip_pool_count:
|
||
|
:param enable_validate_ip:
|
||
|
:return:
|
||
|
"""
|
||
|
pool = ProxyIpPool(ip_pool_count, enable_validate_ip)
|
||
|
await pool.load_proxies()
|
||
|
return pool
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
pass
|