refactor: IP代理池重构

This commit is contained in:
Relakkes 2024-04-05 10:44:05 +08:00
parent d0c578c2bf
commit dde3c0429e
11 changed files with 178 additions and 244 deletions

View File

@ -12,6 +12,9 @@ ENABLE_IP_PROXY = False
# 代理IP池数量 # 代理IP池数量
IP_PROXY_POOL_COUNT = 2 IP_PROXY_POOL_COUNT = 2
# 代理IP提供商名称
IP_PROXY_PROVIDER_NAME = "jishuhttp"
# 设置为True不会打开浏览器无头浏览器设置False会打开一个浏览器小红书如果一直扫码登录不通过打开浏览器手动过一下滑动验证码 # 设置为True不会打开浏览器无头浏览器设置False会打开一个浏览器小红书如果一直扫码登录不通过打开浏览器手动过一下滑动验证码
HEADLESS = True HEADLESS = True

View File

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com # @Author : relakkes@gmail.com
# @Time : 2023/12/2 14:37 # @Time : 2023/12/2 14:37
# @Desc : # @Desc : IP代理池入口
from .base_proxy import *

63
proxy/base_proxy.py Normal file
View File

@ -0,0 +1,63 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 11:18
# @Desc : 爬虫 IP 获取实现
# @Url : 现在实现了极速HTTP的接口官网地址https://www.jisuhttp.com/?pl=mAKphQ&plan=ZY&kd=Yang
import json
from abc import ABC, abstractmethod
from typing import Dict, List
import redis
import config
from tools import utils
from .types import IpInfoModel
class IpGetError(Exception):
""" ip get error"""
class ProxyProvider(ABC):
@abstractmethod
async def get_proxies(self, num: int) -> List[Dict]:
"""
获取 IP 的抽象方法不同的 HTTP 代理商需要实现该方法
:param num: 提取的 IP 数量
:return:
"""
pass
class RedisDbIpCache:
def __init__(self):
self.redis_client = redis.Redis(host=config.REDIS_DB_HOST, password=config.REDIS_DB_PWD)
def set_ip(self, ip_key: str, ip_value_info: str, ex: int):
"""
设置IP并带有过期时间到期之后由 redis 负责删除
:param ip_key:
:param ip_value_info:
:param ex:
:return:
"""
self.redis_client.set(name=ip_key, value=ip_value_info, ex=ex)
def load_all_ip(self, proxy_brand_name: str) -> List[IpInfoModel]:
"""
redis 中加载所有还未过期的 IP 信息
:param proxy_brand_name: 代理商名称
:return:
"""
all_ip_list: List[IpInfoModel] = []
all_ip_keys: List[bytes] = self.redis_client.keys(pattern=f"{proxy_brand_name}_*")
try:
for ip_key in all_ip_keys:
ip_value = self.redis_client.get(ip_key)
if not ip_value:
continue
all_ip_list.append(IpInfoModel(**json.loads(ip_value)))
except Exception as e:
utils.logger.error("[RedisDbIpCache.load_all_ip] get ip err from redis db", e)
return all_ip_list

View File

@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/4/5 10:13
# @Desc :
from .jishu_http_proxy import new_jisu_http_proxy
from .kuaidl_proxy import new_kuai_daili_proxy

View File

@ -1,87 +1,22 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com # @Author : relakkes@gmail.com
# @Time : 2023/12/2 11:18 # @Time : 2024/4/5 09:32
# @Desc : 爬虫 IP 获取实现 # @Desc : 极速HTTP代理提供类实现,官网地址https://www.jisuhttp.com?pl=zG3Jna
# @Url : 现在实现了极速HTTP的接口官网地址https://www.jisuhttp.com/?pl=mAKphQ&plan=ZY&kd=Yang
import asyncio
import json
import os import os
from abc import ABC, abstractmethod from typing import Dict, List
from typing import Dict, List, Optional
from urllib.parse import urlencode from urllib.parse import urlencode
import httpx import httpx
import redis
from pydantic import BaseModel, Field
import config from proxy import IpGetError, ProxyProvider, RedisDbIpCache
from proxy.types import IpInfoModel
from tools import utils from tools import utils
class IpGetError(Exception):
""" ip get error"""
class IpInfoModel(BaseModel):
"""Unified IP model"""
ip: str = Field(title="ip")
port: int = Field(title="端口")
user: str = Field(title="IP代理认证的用户名")
protocol: str = Field(default="https://", title="代理IP的协议")
password: str = Field(title="IP代理认证用户的密码")
expired_time_ts: Optional[int] = Field(title="IP 过期时间")
class ProxyProvider(ABC):
@abstractmethod
async def get_proxies(self, num: int) -> List[Dict]:
"""
获取 IP 的抽象方法不同的 HTTP 代理商需要实现该方法
:param num: 提取的 IP 数量
:return:
"""
pass
class RedisDbIpCache:
def __init__(self):
self.redis_client = redis.Redis(host=config.REDIS_DB_HOST, password=config.REDIS_DB_PWD)
def set_ip(self, ip_key: str, ip_value_info: str, ex: int):
"""
设置IP并带有过期时间到期之后由 redis 负责删除
:param ip_key:
:param ip_value_info:
:param ex:
:return:
"""
self.redis_client.set(name=ip_key, value=ip_value_info, ex=ex)
def load_all_ip(self, proxy_brand_name: str) -> List[IpInfoModel]:
"""
redis 中加载所有还未过期的 IP 信息
:param proxy_brand_name: 代理商名称
:return:
"""
all_ip_list: List[IpInfoModel] = []
all_ip_keys: List[str] = self.redis_client.keys(pattern=f"{proxy_brand_name}_*")
try:
for ip_key in all_ip_keys:
ip_value = self.redis_client.get(ip_key)
if not ip_value:
continue
all_ip_list.append(IpInfoModel(**json.loads(ip_value)))
except Exception as e:
utils.logger.error("[RedisDbIpCache.load_all_ip] get ip err from redis db", e)
return all_ip_list
class JiSuHttpProxy(ProxyProvider): class JiSuHttpProxy(ProxyProvider):
def __init__(self, key: str, crypto: str, time_validity_period: int): def __init__(self, key: str, crypto: str, time_validity_period: int):
""" """
极速HTTP 代理IP实现 极速HTTP 代理IP实现
官网地址https://www.jisuhttp.com/?pl=mAKphQ&plan=ZY&kd=Yang
:param key: 提取key值 (去官网注册后获取) :param key: 提取key值 (去官网注册后获取)
:param crypto: 加密签名 (去官网注册后获取) :param crypto: 加密签名 (去官网注册后获取)
""" """
@ -131,7 +66,7 @@ class JiSuHttpProxy(ProxyProvider):
expired_time_ts=utils.get_unix_time_from_time_str(ip_item.get("expire")) expired_time_ts=utils.get_unix_time_from_time_str(ip_item.get("expire"))
) )
ip_key = f"JISUHTTP_{ip_info_model.ip}_{ip_info_model.port}_{ip_info_model.user}_{ip_info_model.password}" ip_key = f"JISUHTTP_{ip_info_model.ip}_{ip_info_model.port}_{ip_info_model.user}_{ip_info_model.password}"
ip_value = ip_info_model.model_dump_json() ip_value = ip_info_model.json()
ip_infos.append(ip_info_model) ip_infos.append(ip_info_model)
self.ip_cache.set_ip(ip_key, ip_value, ex=ip_info_model.expired_time_ts - current_ts) self.ip_cache.set_ip(ip_key, ip_value, ex=ip_info_model.expired_time_ts - current_ts)
else: else:
@ -139,13 +74,14 @@ class JiSuHttpProxy(ProxyProvider):
return ip_cache_list + ip_infos return ip_cache_list + ip_infos
IpProxy = JiSuHttpProxy( def new_jisu_http_proxy() -> JiSuHttpProxy:
"""
构造极速HTTP实例
Returns:
"""
return JiSuHttpProxy(
key=os.getenv("jisu_key", ""), # 通过环境变量的方式获取极速HTTPIP提取key值 key=os.getenv("jisu_key", ""), # 通过环境变量的方式获取极速HTTPIP提取key值
crypto=os.getenv("jisu_crypto", ""), # 通过环境变量的方式获取极速HTTPIP提取加密签名 crypto=os.getenv("jisu_crypto", ""), # 通过环境变量的方式获取极速HTTPIP提取加密签名
time_validity_period=30 # 30分钟最长时效 time_validity_period=30 # 30分钟最长时效
) )
if __name__ == '__main__':
# 每一次提取都要消耗 IP 数量,谨慎使用
_ip_infos = asyncio.run(IpProxy.get_proxies(1))
print(_ip_infos)

View File

@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/4/5 09:43
# @Desc : 快代理HTTP实现官方文档https://www.kuaidaili.com/?ref=ldwkjqipvz6c
from typing import Dict, List
from proxy import IpGetError, IpInfoModel, ProxyProvider, RedisDbIpCache
class KuaiDaiLiProxy(ProxyProvider):
async def get_proxies(self, num: int) -> List[Dict]:
pass
def new_kuai_daili_proxy() -> KuaiDaiLiProxy:
"""
构造快代理HTTP实例
Returns:
"""
return KuaiDaiLiProxy()

View File

@ -1,137 +0,0 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 11:18
# @Desc : IP 和 手机号 一一配对的账号代理池
from typing import List, Optional, Set, Tuple
import config
class PhonePool:
"""phone pool class"""
def __init__(self) -> None:
self.phones: List[str] = []
self.used_phones: Set[str] = set()
def add_phone(self, phone: str) -> bool:
"""add phone to the pool"""
if phone not in self.phones:
self.phones.append(phone)
return True
return False
def remove_phone(self, phone: str) -> bool:
"""remove phone from the pool"""
if phone in self.used_phones:
self.phones.remove(phone)
self.used_phones.remove(phone)
return True
return False
def get_phone(self) -> Optional[str]:
"""get phone and mark as used"""
if self.phones:
left_phone = self.phones.pop(0)
self.used_phones.add(left_phone)
return left_phone
return None
def clear(self):
"""clear phone pool"""
self.phones = []
self.used_phones = set()
class IPPool:
def __init__(self) -> None:
self.ips: List[str]= []
self.used_ips: Set[str]= set()
def add_ip(self, ip):
"""添加ip"""
if ip not in self.ips:
self.ips.append(ip)
return True
return False
def remove_ip(self, ip: str) -> bool:
"""remove ip"""
if ip in self.used_ips:
self.ips.remove(ip)
self.used_ips.remove(ip)
return True
return False
def get_ip(self) -> Optional[str]:
"""get ip and mark as used"""
if self.ips:
left_ips = self.ips.pop(0)
self.used_ips.add(left_ips)
return left_ips
return None
def clear(self):
""" clear ip pool"""
self.ips = []
self.used_ips = set()
class AccountPool:
"""account pool class"""
def __init__(self):
self.phone_pool = PhonePool()
self.ip_pool = IPPool()
def add_account(self, phone: str, ip: str) -> bool:
"""add account to pool with phone and ip"""
if self.phone_pool.add_phone(phone) and self.ip_pool.add_ip(ip):
return True
return False
def remove_account(self, phone: str, ip: str) -> bool:
"""remove account from pool """
if self.phone_pool.remove_phone(phone) and self.ip_pool.remove_ip(ip):
return True
return False
def get_account(self) -> Tuple[str, str]:
"""get account if no account, reload account pool"""
phone = self.phone_pool.get_phone()
ip = self.ip_pool.get_ip()
if not phone or not ip:
reload_account_pool(self)
return self.get_account()
return phone, ip
def clear_account(self):
"""clear account pool"""
self.phone_pool.clear()
self.ip_pool.clear()
def reload_account_pool(apo: AccountPool):
"""reload account pool"""
apo.clear_account()
for phone, ip in zip(config.PHONE_LIST, config.IP_PROXY_LIST):
apo.add_account(phone, ip)
def create_account_pool() -> AccountPool:
"""create account pool"""
apo = AccountPool()
reload_account_pool(apo=apo)
return apo
if __name__ == '__main__':
import time
ac_pool = create_account_pool()
p, i = ac_pool.get_account()
while p:
print(f"get phone:{p}, ip proxy:{i} from account pool")
p, i = ac_pool.get_account()
time.sleep(1)

View File

@ -2,41 +2,50 @@
# @Author : relakkes@gmail.com # @Author : relakkes@gmail.com
# @Time : 2023/12/2 13:45 # @Time : 2023/12/2 13:45
# @Desc : ip代理池实现 # @Desc : ip代理池实现
import json
import pathlib
import random import random
from typing import List from typing import Dict, List
import httpx import httpx
from tenacity import retry, stop_after_attempt, wait_fixed from tenacity import retry, stop_after_attempt, wait_fixed
import config
from proxy.providers import new_jisu_http_proxy, new_kuai_daili_proxy
from tools import utils from tools import utils
from .proxy_ip_provider import IpInfoModel, IpProxy from .base_proxy import ProxyProvider
from .types import IpInfoModel, ProviderNameEnum
class ProxyIpPool: class ProxyIpPool:
def __init__(self, ip_pool_count: int, enable_validate_ip: bool) -> None: def __init__(self, ip_pool_count: int, enable_validate_ip: bool, ip_provider: ProxyProvider) -> None:
"""
Args:
ip_pool_count:
enable_validate_ip:
ip_provider:
"""
self.valid_ip_url = "https://httpbin.org/ip" # 验证 IP 是否有效的地址 self.valid_ip_url = "https://httpbin.org/ip" # 验证 IP 是否有效的地址
self.ip_pool_count = ip_pool_count self.ip_pool_count = ip_pool_count
self.enable_validate_ip = enable_validate_ip self.enable_validate_ip = enable_validate_ip
self.proxy_list: List[IpInfoModel] = [] self.proxy_list: List[IpInfoModel] = []
self.ip_provider: ProxyProvider = ip_provider
async def load_proxies(self) -> None: async def load_proxies(self) -> None:
""" """
解析 加载IP代理
:return: Returns:
"""
self.proxy_list = await IpProxy.get_proxies(self.ip_pool_count)
async def is_valid_proxy(self, proxy: IpInfoModel) -> bool: """
self.proxy_list = await self.ip_provider.get_proxies(self.ip_pool_count)
async def _is_valid_proxy(self, proxy: IpInfoModel) -> bool:
""" """
验证代理IP是否有效 验证代理IP是否有效
:param proxy: :param proxy:
:return: :return:
""" """
utils.logger.info(f"[ProxyIpPool._is_valid_proxy] testing {proxy.ip} is it valid ")
utils.logger.info(f"[ProxyIpPool.is_valid_proxy] testing {proxy.ip} is it valid ")
try: try:
httpx_proxy = { httpx_proxy = {
f"{proxy.protocol}": f"http://{proxy.user}:{proxy.password}@{proxy.ip}:{proxy.port}" f"{proxy.protocol}": f"http://{proxy.user}:{proxy.password}@{proxy.ip}:{proxy.port}"
@ -48,7 +57,7 @@ class ProxyIpPool:
else: else:
return False return False
except Exception as e: except Exception as e:
utils.logger.info(f"[ProxyIpPool.is_valid_proxy] testing {proxy.ip} err: {e}") utils.logger.info(f"[ProxyIpPool._is_valid_proxy] testing {proxy.ip} err: {e}")
raise e raise e
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) @retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
@ -58,16 +67,16 @@ class ProxyIpPool:
:return: :return:
""" """
if len(self.proxy_list) == 0: if len(self.proxy_list) == 0:
await self.reload_proxies() await self._reload_proxies()
proxy = random.choice(self.proxy_list) proxy = random.choice(self.proxy_list)
if self.enable_validate_ip: if self.enable_validate_ip:
if not await self.is_valid_proxy(proxy): if not await self._is_valid_proxy(proxy):
raise Exception("[ProxyIpPool.get_proxy] current ip invalid and again get it") raise Exception("[ProxyIpPool.get_proxy] current ip invalid and again get it")
self.proxy_list.remove(proxy) self.proxy_list.remove(proxy)
return proxy return proxy
async def reload_proxies(self): async def _reload_proxies(self):
""" """
# 重新加载代理池 # 重新加载代理池
:return: :return:
@ -76,14 +85,23 @@ class ProxyIpPool:
await self.load_proxies() await self.load_proxies()
async def create_ip_pool(ip_pool_count: int, enable_validate_ip) -> ProxyIpPool: IpProxyProvider: Dict[str, ProxyProvider] = {
ProviderNameEnum.JISHU_HTTP_PROVIDER.value: new_jisu_http_proxy(),
ProviderNameEnum.KUAI_DAILI_PROVIDER.value: new_kuai_daili_proxy()
}
async def create_ip_pool(ip_pool_count: int, enable_validate_ip: bool) -> ProxyIpPool:
""" """
创建 IP 代理池 创建 IP 代理池
:param ip_pool_count: :param ip_pool_count: ip池子的数量
:param enable_validate_ip: :param enable_validate_ip: 是否开启验证IP代理
:return: :return:
""" """
pool = ProxyIpPool(ip_pool_count, enable_validate_ip) pool = ProxyIpPool(ip_pool_count=ip_pool_count,
enable_validate_ip=enable_validate_ip,
ip_provider=IpProxyProvider.get(config.IP_PROXY_PROVIDER_NAME)
)
await pool.load_proxies() await pool.load_proxies()
return pool return pool

23
proxy/types.py Normal file
View File

@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/4/5 10:18
# @Desc : 基础类型
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field
class ProviderNameEnum(Enum):
JISHU_HTTP_PROVIDER: str = "jishuhttp"
KUAI_DAILI_PROVIDER: str = "kuaidaili"
class IpInfoModel(BaseModel):
"""Unified IP model"""
ip: str = Field(title="ip")
port: int = Field(title="端口")
user: str = Field(title="IP代理认证的用户名")
protocol: str = Field(default="https://", title="代理IP的协议")
password: str = Field(title="IP代理认证用户的密码")
expired_time_ts: Optional[int] = Field(title="IP 过期时间")

View File

@ -2,13 +2,12 @@ httpx==0.24.0
Pillow==9.5.0 Pillow==9.5.0
playwright==1.42.0 playwright==1.42.0
tenacity==8.2.2 tenacity==8.2.2
tornado==6.3.2 tornado
PyExecJS==1.5.1 PyExecJS==1.5.1
opencv-python==4.7.0.72 opencv-python==4.7.0.72
tortoise-orm==0.20.0 tortoise-orm==0.20.0
aiomysql==0.2.0 aiomysql==0.2.0
aerich==0.7.2 aerich==0.7.2
numpy~=1.24.4
redis~=4.6.0 redis~=4.6.0
pydantic==2.5.2 pydantic==2.5.2
aiofiles~=23.2.1 aiofiles~=23.2.1

View File

@ -5,13 +5,14 @@
from unittest import IsolatedAsyncioTestCase from unittest import IsolatedAsyncioTestCase
from proxy.proxy_ip_pool import create_ip_pool from proxy.proxy_ip_pool import create_ip_pool
from proxy.proxy_ip_provider import IpInfoModel from proxy.types import IpInfoModel
class TestIpPool(IsolatedAsyncioTestCase): class TestIpPool(IsolatedAsyncioTestCase):
async def test_ip_pool(self): async def test_ip_pool(self):
pool = await create_ip_pool(ip_pool_count=3, enable_validate_ip=True) pool = await create_ip_pool(ip_pool_count=1, enable_validate_ip=True)
for i in range(1): print("\n")
for i in range(3):
ip_proxy_info: IpInfoModel = await pool.get_proxy() ip_proxy_info: IpInfoModel = await pool.get_proxy()
print(ip_proxy_info) print(ip_proxy_info)
self.assertIsNotNone(ip_proxy_info.ip, msg="验证 ip 是否获取成功") self.assertIsNotNone(ip_proxy_info.ip, msg="验证 ip 是否获取成功")