diff --git a/config/base_config.py b/config/base_config.py index 2faf81a..e6ccd03 100644 --- a/config/base_config.py +++ b/config/base_config.py @@ -4,7 +4,7 @@ KEYWORDS = "python,golang" LOGIN_TYPE = "qrcode" # qrcode or phone or cookie COOKIES = "" SORT_TYPE = "popularity_descending" # 具体值参见media_platform.xxx.field下的枚举值,展示只支持小红书 -CRAWLER_TYPE = "search" # 爬取类型,search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据) +CRAWLER_TYPE = "search" # 爬取类型,search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据) # 是否开启 IP 代理 ENABLE_IP_PROXY = False @@ -12,6 +12,9 @@ ENABLE_IP_PROXY = False # 代理IP池数量 IP_PROXY_POOL_COUNT = 2 +# 代理IP提供商名称 +IP_PROXY_PROVIDER_NAME = "jishuhttp" + # 设置为True不会打开浏览器(无头浏览器),设置False会打开一个浏览器(小红书如果一直扫码登录不通过,打开浏览器手动过一下滑动验证码) HEADLESS = True diff --git a/proxy/__init__.py b/proxy/__init__.py index ea02f37..8b9fb1c 100644 --- a/proxy/__init__.py +++ b/proxy/__init__.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- # @Author : relakkes@gmail.com # @Time : 2023/12/2 14:37 -# @Desc : +# @Desc : IP代理池入口 +from .base_proxy import * diff --git a/proxy/base_proxy.py b/proxy/base_proxy.py new file mode 100644 index 0000000..cd1a4cc --- /dev/null +++ b/proxy/base_proxy.py @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2023/12/2 11:18 +# @Desc : 爬虫 IP 获取实现 +# @Url : 现在实现了极速HTTP的接口,官网地址:https://www.jisuhttp.com/?pl=mAKphQ&plan=ZY&kd=Yang +import json +from abc import ABC, abstractmethod +from typing import Dict, List + +import redis + +import config +from tools import utils + +from .types import IpInfoModel + + +class IpGetError(Exception): + """ ip get error""" + + +class ProxyProvider(ABC): + @abstractmethod + async def get_proxies(self, num: int) -> List[Dict]: + """ + 获取 IP 的抽象方法,不同的 HTTP 代理商需要实现该方法 + :param num: 提取的 IP 数量 + :return: + """ + pass + + +class RedisDbIpCache: + def __init__(self): + self.redis_client = redis.Redis(host=config.REDIS_DB_HOST, password=config.REDIS_DB_PWD) + + def set_ip(self, ip_key: str, ip_value_info: str, ex: int): + """ + 设置IP并带有过期时间,到期之后由 redis 负责删除 + :param ip_key: + :param ip_value_info: + :param ex: + :return: + """ + self.redis_client.set(name=ip_key, value=ip_value_info, ex=ex) + + def load_all_ip(self, proxy_brand_name: str) -> List[IpInfoModel]: + """ + 从 redis 中加载所有还未过期的 IP 信息 + :param proxy_brand_name: 代理商名称 + :return: + """ + all_ip_list: List[IpInfoModel] = [] + all_ip_keys: List[bytes] = self.redis_client.keys(pattern=f"{proxy_brand_name}_*") + try: + for ip_key in all_ip_keys: + ip_value = self.redis_client.get(ip_key) + if not ip_value: + continue + all_ip_list.append(IpInfoModel(**json.loads(ip_value))) + except Exception as e: + utils.logger.error("[RedisDbIpCache.load_all_ip] get ip err from redis db", e) + return all_ip_list diff --git a/proxy/providers/__init__.py b/proxy/providers/__init__.py new file mode 100644 index 0000000..27f7f86 --- /dev/null +++ b/proxy/providers/__init__.py @@ -0,0 +1,6 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/4/5 10:13 +# @Desc : +from .jishu_http_proxy import new_jisu_http_proxy +from .kuaidl_proxy import new_kuai_daili_proxy \ No newline at end of file diff --git a/proxy/proxy_ip_provider.py b/proxy/providers/jishu_http_proxy.py similarity index 51% rename from proxy/proxy_ip_provider.py rename to proxy/providers/jishu_http_proxy.py index 822afb9..fd5b3b3 100644 --- a/proxy/proxy_ip_provider.py +++ b/proxy/providers/jishu_http_proxy.py @@ -1,87 +1,22 @@ # -*- coding: utf-8 -*- # @Author : relakkes@gmail.com -# @Time : 2023/12/2 11:18 -# @Desc : 爬虫 IP 获取实现 -# @Url : 现在实现了极速HTTP的接口,官网地址:https://www.jisuhttp.com/?pl=mAKphQ&plan=ZY&kd=Yang - -import asyncio -import json +# @Time : 2024/4/5 09:32 +# @Desc : 极速HTTP代理提供类实现,官网地址:https://www.jisuhttp.com?pl=zG3Jna import os -from abc import ABC, abstractmethod -from typing import Dict, List, Optional +from typing import Dict, List from urllib.parse import urlencode import httpx -import redis -from pydantic import BaseModel, Field -import config +from proxy import IpGetError, ProxyProvider, RedisDbIpCache +from proxy.types import IpInfoModel from tools import utils -class IpGetError(Exception): - """ ip get error""" - - -class IpInfoModel(BaseModel): - """Unified IP model""" - ip: str = Field(title="ip") - port: int = Field(title="端口") - user: str = Field(title="IP代理认证的用户名") - protocol: str = Field(default="https://", title="代理IP的协议") - password: str = Field(title="IP代理认证用户的密码") - expired_time_ts: Optional[int] = Field(title="IP 过期时间") - - -class ProxyProvider(ABC): - @abstractmethod - async def get_proxies(self, num: int) -> List[Dict]: - """ - 获取 IP 的抽象方法,不同的 HTTP 代理商需要实现该方法 - :param num: 提取的 IP 数量 - :return: - """ - pass - - -class RedisDbIpCache: - def __init__(self): - self.redis_client = redis.Redis(host=config.REDIS_DB_HOST, password=config.REDIS_DB_PWD) - - def set_ip(self, ip_key: str, ip_value_info: str, ex: int): - """ - 设置IP并带有过期时间,到期之后由 redis 负责删除 - :param ip_key: - :param ip_value_info: - :param ex: - :return: - """ - self.redis_client.set(name=ip_key, value=ip_value_info, ex=ex) - - def load_all_ip(self, proxy_brand_name: str) -> List[IpInfoModel]: - """ - 从 redis 中加载所有还未过期的 IP 信息 - :param proxy_brand_name: 代理商名称 - :return: - """ - all_ip_list: List[IpInfoModel] = [] - all_ip_keys: List[str] = self.redis_client.keys(pattern=f"{proxy_brand_name}_*") - try: - for ip_key in all_ip_keys: - ip_value = self.redis_client.get(ip_key) - if not ip_value: - continue - all_ip_list.append(IpInfoModel(**json.loads(ip_value))) - except Exception as e: - utils.logger.error("[RedisDbIpCache.load_all_ip] get ip err from redis db", e) - return all_ip_list - - class JiSuHttpProxy(ProxyProvider): def __init__(self, key: str, crypto: str, time_validity_period: int): """ 极速HTTP 代理IP实现 - 官网地址:https://www.jisuhttp.com/?pl=mAKphQ&plan=ZY&kd=Yang :param key: 提取key值 (去官网注册后获取) :param crypto: 加密签名 (去官网注册后获取) """ @@ -131,7 +66,7 @@ class JiSuHttpProxy(ProxyProvider): expired_time_ts=utils.get_unix_time_from_time_str(ip_item.get("expire")) ) ip_key = f"JISUHTTP_{ip_info_model.ip}_{ip_info_model.port}_{ip_info_model.user}_{ip_info_model.password}" - ip_value = ip_info_model.model_dump_json() + ip_value = ip_info_model.json() ip_infos.append(ip_info_model) self.ip_cache.set_ip(ip_key, ip_value, ex=ip_info_model.expired_time_ts - current_ts) else: @@ -139,13 +74,14 @@ class JiSuHttpProxy(ProxyProvider): return ip_cache_list + ip_infos -IpProxy = JiSuHttpProxy( - key=os.getenv("jisu_key", ""), # 通过环境变量的方式获取极速HTTPIP提取key值 - crypto=os.getenv("jisu_crypto", ""), # 通过环境变量的方式获取极速HTTPIP提取加密签名 - time_validity_period=30 # 30分钟(最长时效) -) +def new_jisu_http_proxy() -> JiSuHttpProxy: + """ + 构造极速HTTP实例 + Returns: -if __name__ == '__main__': - # 每一次提取都要消耗 IP 数量,谨慎使用 - _ip_infos = asyncio.run(IpProxy.get_proxies(1)) - print(_ip_infos) + """ + return JiSuHttpProxy( + key=os.getenv("jisu_key", ""), # 通过环境变量的方式获取极速HTTPIP提取key值 + crypto=os.getenv("jisu_crypto", ""), # 通过环境变量的方式获取极速HTTPIP提取加密签名 + time_validity_period=30 # 30分钟(最长时效) + ) diff --git a/proxy/providers/kuaidl_proxy.py b/proxy/providers/kuaidl_proxy.py new file mode 100644 index 0000000..c16f471 --- /dev/null +++ b/proxy/providers/kuaidl_proxy.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/4/5 09:43 +# @Desc : 快代理HTTP实现,官方文档:https://www.kuaidaili.com/?ref=ldwkjqipvz6c +from typing import Dict, List + +from proxy import IpGetError, IpInfoModel, ProxyProvider, RedisDbIpCache + + +class KuaiDaiLiProxy(ProxyProvider): + async def get_proxies(self, num: int) -> List[Dict]: + pass + + +def new_kuai_daili_proxy() -> KuaiDaiLiProxy: + """ + 构造快代理HTTP实例 + Returns: + + """ + return KuaiDaiLiProxy() diff --git a/proxy/proxy_account_pool.py b/proxy/proxy_account_pool.py deleted file mode 100644 index e021e91..0000000 --- a/proxy/proxy_account_pool.py +++ /dev/null @@ -1,137 +0,0 @@ -# -*- coding: utf-8 -*- -# @Author : relakkes@gmail.com -# @Time : 2023/12/2 11:18 -# @Desc : IP 和 手机号 一一配对的账号代理池 - -from typing import List, Optional, Set, Tuple - -import config - - -class PhonePool: - """phone pool class""" - - def __init__(self) -> None: - self.phones: List[str] = [] - self.used_phones: Set[str] = set() - - def add_phone(self, phone: str) -> bool: - """add phone to the pool""" - if phone not in self.phones: - self.phones.append(phone) - return True - return False - - def remove_phone(self, phone: str) -> bool: - """remove phone from the pool""" - if phone in self.used_phones: - self.phones.remove(phone) - self.used_phones.remove(phone) - return True - return False - - def get_phone(self) -> Optional[str]: - """get phone and mark as used""" - if self.phones: - left_phone = self.phones.pop(0) - self.used_phones.add(left_phone) - return left_phone - return None - - def clear(self): - """clear phone pool""" - self.phones = [] - self.used_phones = set() - - -class IPPool: - def __init__(self) -> None: - self.ips: List[str]= [] - self.used_ips: Set[str]= set() - - def add_ip(self, ip): - """添加ip""" - if ip not in self.ips: - self.ips.append(ip) - return True - return False - - def remove_ip(self, ip: str) -> bool: - """remove ip""" - if ip in self.used_ips: - self.ips.remove(ip) - self.used_ips.remove(ip) - return True - return False - - def get_ip(self) -> Optional[str]: - """get ip and mark as used""" - if self.ips: - left_ips = self.ips.pop(0) - self.used_ips.add(left_ips) - return left_ips - return None - - def clear(self): - """ clear ip pool""" - self.ips = [] - self.used_ips = set() - - -class AccountPool: - """account pool class""" - - def __init__(self): - self.phone_pool = PhonePool() - self.ip_pool = IPPool() - - def add_account(self, phone: str, ip: str) -> bool: - """add account to pool with phone and ip""" - if self.phone_pool.add_phone(phone) and self.ip_pool.add_ip(ip): - return True - return False - - def remove_account(self, phone: str, ip: str) -> bool: - """remove account from pool """ - if self.phone_pool.remove_phone(phone) and self.ip_pool.remove_ip(ip): - return True - return False - - def get_account(self) -> Tuple[str, str]: - """get account if no account, reload account pool""" - phone = self.phone_pool.get_phone() - ip = self.ip_pool.get_ip() - if not phone or not ip: - reload_account_pool(self) - return self.get_account() - return phone, ip - - def clear_account(self): - """clear account pool""" - self.phone_pool.clear() - self.ip_pool.clear() - - -def reload_account_pool(apo: AccountPool): - """reload account pool""" - apo.clear_account() - for phone, ip in zip(config.PHONE_LIST, config.IP_PROXY_LIST): - apo.add_account(phone, ip) - - -def create_account_pool() -> AccountPool: - """create account pool""" - apo = AccountPool() - reload_account_pool(apo=apo) - return apo - - -if __name__ == '__main__': - import time - - ac_pool = create_account_pool() - p, i = ac_pool.get_account() - while p: - print(f"get phone:{p}, ip proxy:{i} from account pool") - p, i = ac_pool.get_account() - time.sleep(1) diff --git a/proxy/proxy_ip_pool.py b/proxy/proxy_ip_pool.py index be5e79f..febc39e 100644 --- a/proxy/proxy_ip_pool.py +++ b/proxy/proxy_ip_pool.py @@ -2,41 +2,50 @@ # @Author : relakkes@gmail.com # @Time : 2023/12/2 13:45 # @Desc : ip代理池实现 -import json -import pathlib import random -from typing import List +from typing import Dict, List import httpx from tenacity import retry, stop_after_attempt, wait_fixed +import config +from proxy.providers import new_jisu_http_proxy, new_kuai_daili_proxy from tools import utils -from .proxy_ip_provider import IpInfoModel, IpProxy +from .base_proxy import ProxyProvider +from .types import IpInfoModel, ProviderNameEnum class ProxyIpPool: - def __init__(self, ip_pool_count: int, enable_validate_ip: bool) -> None: + def __init__(self, ip_pool_count: int, enable_validate_ip: bool, ip_provider: ProxyProvider) -> None: + """ + + Args: + ip_pool_count: + enable_validate_ip: + ip_provider: + """ self.valid_ip_url = "https://httpbin.org/ip" # 验证 IP 是否有效的地址 self.ip_pool_count = ip_pool_count self.enable_validate_ip = enable_validate_ip self.proxy_list: List[IpInfoModel] = [] + self.ip_provider: ProxyProvider = ip_provider async def load_proxies(self) -> None: """ - 解析 - :return: - """ - self.proxy_list = await IpProxy.get_proxies(self.ip_pool_count) + 加载IP代理 + Returns: - async def is_valid_proxy(self, proxy: IpInfoModel) -> bool: + """ + self.proxy_list = await self.ip_provider.get_proxies(self.ip_pool_count) + + async def _is_valid_proxy(self, proxy: IpInfoModel) -> bool: """ 验证代理IP是否有效 :param proxy: :return: """ - - utils.logger.info(f"[ProxyIpPool.is_valid_proxy] testing {proxy.ip} is it valid ") + utils.logger.info(f"[ProxyIpPool._is_valid_proxy] testing {proxy.ip} is it valid ") try: httpx_proxy = { f"{proxy.protocol}": f"http://{proxy.user}:{proxy.password}@{proxy.ip}:{proxy.port}" @@ -48,7 +57,7 @@ class ProxyIpPool: else: return False except Exception as e: - utils.logger.info(f"[ProxyIpPool.is_valid_proxy] testing {proxy.ip} err: {e}") + utils.logger.info(f"[ProxyIpPool._is_valid_proxy] testing {proxy.ip} err: {e}") raise e @retry(stop=stop_after_attempt(3), wait=wait_fixed(1)) @@ -58,16 +67,16 @@ class ProxyIpPool: :return: """ if len(self.proxy_list) == 0: - await self.reload_proxies() + await self._reload_proxies() proxy = random.choice(self.proxy_list) if self.enable_validate_ip: - if not await self.is_valid_proxy(proxy): + if not await self._is_valid_proxy(proxy): raise Exception("[ProxyIpPool.get_proxy] current ip invalid and again get it") self.proxy_list.remove(proxy) return proxy - async def reload_proxies(self): + async def _reload_proxies(self): """ # 重新加载代理池 :return: @@ -76,14 +85,23 @@ class ProxyIpPool: await self.load_proxies() -async def create_ip_pool(ip_pool_count: int, enable_validate_ip) -> ProxyIpPool: +IpProxyProvider: Dict[str, ProxyProvider] = { + ProviderNameEnum.JISHU_HTTP_PROVIDER.value: new_jisu_http_proxy(), + ProviderNameEnum.KUAI_DAILI_PROVIDER.value: new_kuai_daili_proxy() +} + + +async def create_ip_pool(ip_pool_count: int, enable_validate_ip: bool) -> ProxyIpPool: """ 创建 IP 代理池 - :param ip_pool_count: - :param enable_validate_ip: + :param ip_pool_count: ip池子的数量 + :param enable_validate_ip: 是否开启验证IP代理 :return: """ - pool = ProxyIpPool(ip_pool_count, enable_validate_ip) + pool = ProxyIpPool(ip_pool_count=ip_pool_count, + enable_validate_ip=enable_validate_ip, + ip_provider=IpProxyProvider.get(config.IP_PROXY_PROVIDER_NAME) + ) await pool.load_proxies() return pool diff --git a/proxy/types.py b/proxy/types.py new file mode 100644 index 0000000..29470a0 --- /dev/null +++ b/proxy/types.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Time : 2024/4/5 10:18 +# @Desc : 基础类型 +from enum import Enum +from typing import Optional + +from pydantic import BaseModel, Field + + +class ProviderNameEnum(Enum): + JISHU_HTTP_PROVIDER: str = "jishuhttp" + KUAI_DAILI_PROVIDER: str = "kuaidaili" + + +class IpInfoModel(BaseModel): + """Unified IP model""" + ip: str = Field(title="ip") + port: int = Field(title="端口") + user: str = Field(title="IP代理认证的用户名") + protocol: str = Field(default="https://", title="代理IP的协议") + password: str = Field(title="IP代理认证用户的密码") + expired_time_ts: Optional[int] = Field(title="IP 过期时间") diff --git a/requirements.txt b/requirements.txt index 9518594..86e4b26 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,13 +2,12 @@ httpx==0.24.0 Pillow==9.5.0 playwright==1.42.0 tenacity==8.2.2 -tornado==6.3.2 +tornado PyExecJS==1.5.1 opencv-python==4.7.0.72 tortoise-orm==0.20.0 aiomysql==0.2.0 aerich==0.7.2 -numpy~=1.24.4 redis~=4.6.0 pydantic==2.5.2 aiofiles~=23.2.1 diff --git a/test/test_proxy_ip_pool.py b/test/test_proxy_ip_pool.py index 5974c90..89f088c 100644 --- a/test/test_proxy_ip_pool.py +++ b/test/test_proxy_ip_pool.py @@ -5,13 +5,14 @@ from unittest import IsolatedAsyncioTestCase from proxy.proxy_ip_pool import create_ip_pool -from proxy.proxy_ip_provider import IpInfoModel +from proxy.types import IpInfoModel class TestIpPool(IsolatedAsyncioTestCase): async def test_ip_pool(self): - pool = await create_ip_pool(ip_pool_count=3, enable_validate_ip=True) - for i in range(1): + pool = await create_ip_pool(ip_pool_count=1, enable_validate_ip=True) + print("\n") + for i in range(3): ip_proxy_info: IpInfoModel = await pool.get_proxy() print(ip_proxy_info) self.assertIsNotNone(ip_proxy_info.ip, msg="验证 ip 是否获取成功")