diff --git a/.gitignore b/.gitignore index 8996d3e..8402108 100644 --- a/.gitignore +++ b/.gitignore @@ -165,3 +165,4 @@ cython_debug/ /temp_image/ /browser_data/ /data/ +/cache \ No newline at end of file diff --git a/media_platform/bilibili/client.py b/media_platform/bilibili/client.py index b4d8e71..d2d7e89 100644 --- a/media_platform/bilibili/client.py +++ b/media_platform/bilibili/client.py @@ -4,7 +4,7 @@ # @Desc : bilibili 请求客户端 import asyncio import json -from typing import Any, Callable, Dict, Optional, List, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple from urllib.parse import urlencode import httpx @@ -12,9 +12,9 @@ from playwright.async_api import BrowserContext, Page from tools import utils -from .help import BilibiliSign from .exception import DataFetchError -from .field import SearchOrderType, CommentOrderType +from .field import CommentOrderType, SearchOrderType +from .help import BilibiliSign class BilibiliClient: diff --git a/media_platform/bilibili/core.py b/media_platform/bilibili/core.py index cd7587f..feca000 100644 --- a/media_platform/bilibili/core.py +++ b/media_platform/bilibili/core.py @@ -20,7 +20,6 @@ from proxy.proxy_account_pool import AccountPool from tools import utils from var import comment_tasks_var, crawler_type_var - from .client import BilibiliClient from .field import SearchOrderType from .login import BilibiliLogin diff --git a/media_platform/bilibili/login.py b/media_platform/bilibili/login.py index 25d8d68..9426c46 100644 --- a/media_platform/bilibili/login.py +++ b/media_platform/bilibili/login.py @@ -16,7 +16,6 @@ from tenacity import (RetryError, retry, retry_if_result, stop_after_attempt, import config from base.base_crawler import AbstractLogin from tools import utils -from base.base_crawler import AbstractLogin class BilibiliLogin(AbstractLogin): diff --git a/media_platform/douyin/client.py b/media_platform/douyin/client.py index 6fe2eec..853faae 100644 --- a/media_platform/douyin/client.py +++ b/media_platform/douyin/client.py @@ -1,7 +1,7 @@ import asyncio import copy import urllib.parse -from typing import Any, Callable, Dict, Optional, List +from typing import Any, Callable, Dict, List, Optional import execjs import httpx diff --git a/proxy/proxy_ip_pool.py b/proxy/proxy_ip_pool.py index 736aec4..4763528 100644 --- a/proxy/proxy_ip_pool.py +++ b/proxy/proxy_ip_pool.py @@ -2,8 +2,10 @@ # @Author : relakkes@gmail.com # @Time : 2023/12/2 13:45 # @Desc : ip代理池实现 +import json +import pathlib import random -from typing import List +from typing import Dict, List import httpx from tenacity import retry, stop_after_attempt, wait_fixed @@ -22,7 +24,7 @@ class ProxyIpPool: async def load_proxies(self) -> None: """ - 从 HTTP 代理商获取 IP 列表 + 解析 :return: """ self.proxy_list = await IpProxy.get_proxies(self.ip_pool_count) diff --git a/proxy/proxy_ip_provider.py b/proxy/proxy_ip_provider.py index e494f68..77318b0 100644 --- a/proxy/proxy_ip_provider.py +++ b/proxy/proxy_ip_provider.py @@ -5,14 +5,17 @@ # @Url : 现在实现了极速HTTP的接口,官网地址:https://www.jisuhttp.com/?pl=mAKphQ&plan=ZY&kd=Yang import asyncio +import json import os from abc import ABC, abstractmethod from typing import Dict, List, Optional from urllib.parse import urlencode import httpx +import redis from pydantic import BaseModel, Field +import config from tools import utils @@ -41,71 +44,108 @@ class ProxyProvider(ABC): pass +class RedisDbIpCache: + def __init__(self): + self.redis_client = redis.Redis(host=config.REDIS_DB_HOST, password=config.REDIS_DB_PWD) + + def set_ip(self, ip_key: str, ip_value_info: str, ex: int): + """ + 设置IP并带有过期时间,到期之后由 redis 负责删除 + :param ip_key: + :param ip_value_info: + :param ex: + :return: + """ + self.redis_client.set(name=ip_key, value=ip_value_info, ex=ex) + + def load_all_ip(self, proxy_brand_name: str) -> List[IpInfoModel]: + """ + 从 redis 中加载所有还未过期的 IP 信息 + :param proxy_brand_name: 代理商名称 + :return: + """ + all_ip_list: List[IpInfoModel] = [] + all_ip_keys: List[str] = self.redis_client.keys(pattern=f"{proxy_brand_name}_*") + try: + for ip_key in all_ip_keys: + ip_value = self.redis_client.get(ip_key) + if not ip_value: + continue + all_ip_list.append(IpInfoModel(**json.loads(ip_value))) + except Exception as e: + utils.logger.error("[RedisDbIpCache.load_all_ip] get ip err from redis db", e) + return all_ip_list + + class JiSuHttpProxy(ProxyProvider): - def __init__(self, exract_type: str, key: str, crypto: str, res_type: str, protocol: int, time: int): + def __init__(self, key: str, crypto: str, time_validity_period: int): """ 极速HTTP 代理IP实现 官网地址:https://www.jisuhttp.com/?pl=mAKphQ&plan=ZY&kd=Yang - :param exract_type: 提取方式 - :param key: 提取key值 (到上面链接的官网去注册后获取) - :param crypto: 加密签名 (到上面链接的官网去注册后获取) - :param res_type: 返回的数据格式:TXT、JSON - :param protocol: IP协议:1:HTTP、2:HTTPS、3:SOCKS5 - :param time: IP使用时长,支持3、5、10、15、30分钟时效 + :param key: 提取key值 (去官网注册后获取) + :param crypto: 加密签名 (去官网注册后获取) """ - self.exract_type = exract_type + self.proxy_brand_name = "JISUHTTP" self.api_path = "https://api.jisuhttp.com" self.params = { "key": key, "crypto": crypto, - "type": res_type, - "port": protocol, - "time": time, + "time": time_validity_period, # IP使用时长,支持3、5、10、15、30分钟时效 + "type": "json", # 数据结果为json + "port": "2", # IP协议:1:HTTP、2:HTTPS、3:SOCKS5 "pw": "1", # 是否使用账密验证, 1:是,0:否,否表示白名单验证;默认为0 "se": "1", # 返回JSON格式时是否显示IP过期时间, 1:显示,0:不显示;默认为0 } + self.ip_cache = RedisDbIpCache() async def get_proxies(self, num: int) -> List[IpInfoModel]: """ :param num: :return: """ - if self.exract_type == "API": - uri = "/fetchips" - self.params.update({"num": num}) - ip_infos = [] - async with httpx.AsyncClient() as client: - url = self.api_path + uri + '?' + urlencode(self.params) - utils.logger.info(f"[JiSuHttpProxy] get ip proxy url:{url}") - response = await client.get(url, headers={"User-Agent": "MediaCrawler"}) - res_dict: Dict = response.json() - if res_dict.get("code") == 0: - data: List[Dict] = res_dict.get("data") - for ip_item in data: - ip_info_model = IpInfoModel( - ip=ip_item.get("ip"), - port=ip_item.get("port"), - user=ip_item.get("user"), - password=ip_item.get("pass"), - expired_time_ts=utils.get_unix_time_from_time_str(ip_item.get("expire")) - ) - ip_infos.append(ip_info_model) - else: - raise IpGetError(res_dict.get("msg", "unkown err")) - return ip_infos - else: - pass + # 优先从缓存中拿 IP + ip_cache_list = self.ip_cache.load_all_ip(proxy_brand_name=self.proxy_brand_name) + if len(ip_cache_list) >= num: + return ip_cache_list[:num] + + # 如果缓存中的数量不够,从IP代理商获取补上,再存入缓存中 + need_get_count = num - len(ip_cache_list) + self.params.update({"num": need_get_count}) + ip_infos = [] + async with httpx.AsyncClient() as client: + url = self.api_path + "/fetchips" + '?' + urlencode(self.params) + utils.logger.info(f"[JiSuHttpProxy] get ip proxy url:{url}") + response = await client.get(url, headers={ + "User-Agent": "MediaCrawler https://github.com/NanmiCoder/MediaCrawler"}) + res_dict: Dict = response.json() + if res_dict.get("code") == 0: + data: List[Dict] = res_dict.get("data") + current_ts = utils.get_unix_timestamp() + for ip_item in data: + ip_info_model = IpInfoModel( + ip=ip_item.get("ip"), + port=ip_item.get("port"), + user=ip_item.get("user"), + password=ip_item.get("pass"), + expired_time_ts=utils.get_unix_time_from_time_str(ip_item.get("expire")) + ) + ip_key = f"JISUHTTP_{ip_info_model.ip}_{ip_info_model.port}_{ip_info_model.user}_{ip_info_model.password}" + ip_value = ip_info_model.model_dump_json() + ip_infos.append(ip_info_model) + self.ip_cache.set_ip(ip_key, ip_value, ex=ip_info_model.expired_time_ts - current_ts) + else: + raise IpGetError(res_dict.get("msg", "unkown err")) + return ip_cache_list + ip_infos IpProxy = JiSuHttpProxy( key=os.getenv("jisu_key", ""), # 通过环境变量的方式获取极速HTTPIP提取key值 crypto=os.getenv("jisu_crypto", ""), # 通过环境变量的方式获取极速HTTPIP提取加密签名 - res_type="json", - protocol=2, - time=30 + time_validity_period=30 # 30分钟(最长时效) ) if __name__ == '__main__': + # 每一次提取都要消耗 IP 数量,谨慎使用 _ip_infos = asyncio.run(IpProxy.get_proxies(1)) print(_ip_infos) diff --git a/test/test_proxy_ip_pool.py b/test/test_proxy_ip_pool.py index 5530cbe..4c2a52c 100644 --- a/test/test_proxy_ip_pool.py +++ b/test/test_proxy_ip_pool.py @@ -10,8 +10,9 @@ from proxy.proxy_ip_provider import IpInfoModel class TestIpPool(IsolatedAsyncioTestCase): async def test_ip_pool(self): - pool = await create_ip_pool(ip_pool_count=30, enable_validate_ip=False) - for i in range(30): + pool = await create_ip_pool(ip_pool_count=3, enable_validate_ip=True) + for i in range(3): ip_proxy_info: IpInfoModel = await pool.get_proxy() - self.assertIsNotNone(ip_proxy_info.ip, msg="验证 ip 是否获取成功") print(ip_proxy_info) + self.assertIsNotNone(ip_proxy_info.ip, msg="验证 ip 是否获取成功") +