MediaCrawler/proxy/providers/kuaidl_proxy.py

135 lines
4.4 KiB
Python
Raw Normal View History

2024-04-05 02:44:05 +00:00
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2024/4/5 09:43
# @Desc : 快代理HTTP实现官方文档https://www.kuaidaili.com/?ref=ldwkjqipvz6c
2024-04-17 15:12:05 +00:00
import os
import re
2024-04-05 02:44:05 +00:00
from typing import Dict, List
2024-04-17 15:12:05 +00:00
import httpx
from pydantic import BaseModel, Field
2024-04-05 02:44:05 +00:00
from proxy import IpGetError, IpInfoModel, ProxyProvider, RedisDbIpCache
2024-04-17 15:12:05 +00:00
from proxy.types import ProviderNameEnum
from tools import utils
class KuaidailiProxyModel(BaseModel):
ip: str = Field("ip")
port: int = Field("端口")
expire_ts: int = Field("过期时间")
def parse_kuaidaili_proxy(proxy_info: str) -> KuaidailiProxyModel:
"""
解析快代理的IP信息
Args:
proxy_info:
Returns:
"""
proxies: List[str] = proxy_info.split(":")
if len(proxies) != 2:
raise Exception("not invalid kuaidaili proxy info")
pattern = r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):(\d{1,5}),(\d+)'
match = re.search(pattern, proxy_info)
if not match.groups():
raise Exception("not match kuaidaili proxy info")
return KuaidailiProxyModel(
ip=match.groups()[0],
port=int(match.groups()[1]),
expire_ts=int(match.groups()[2])
)
2024-04-05 02:44:05 +00:00
class KuaiDaiLiProxy(ProxyProvider):
2024-04-17 15:19:44 +00:00
def __init__(self, kdl_user_name: str, kdl_user_pwd: str, kdl_secret_id: str, kdl_signature: str):
2024-04-17 15:12:05 +00:00
"""
Args:
kdl_user_name:
kdl_user_pwd:
"""
self.kdl_user_name = kdl_user_name
self.kdl_user_pwd = kdl_user_pwd
self.api_base = "https://dps.kdlapi.com/"
2024-04-17 15:19:44 +00:00
self.secret_id = kdl_secret_id
self.signature = kdl_signature
2024-04-17 15:12:05 +00:00
self.ip_cache = RedisDbIpCache()
self.proxy_brand_name = ProviderNameEnum.KUAI_DAILI_PROVIDER.value
self.params = {
2024-04-17 15:19:44 +00:00
"secret_id": self.secret_id,
"signature": self.signature,
2024-04-17 15:12:05 +00:00
"pt": 1,
"format": "json",
"sep": 1,
"f_et": 1,
}
async def get_proxies(self, num: int) -> List[IpInfoModel]:
"""
快代理实现
Args:
num:
Returns:
"""
uri = "/api/getdps/"
# 优先从缓存中拿 IP
ip_cache_list = self.ip_cache.load_all_ip(proxy_brand_name=self.proxy_brand_name)
if len(ip_cache_list) >= num:
return ip_cache_list[:num]
# 如果缓存中的数量不够从IP代理商获取补上再存入缓存中
need_get_count = num - len(ip_cache_list)
self.params.update({"num": need_get_count})
ip_infos: List[IpInfoModel] = []
async with httpx.AsyncClient() as client:
response = await client.get(self.api_base + uri, params=self.params)
if response.status_code != 200:
utils.logger.error(f"[KuaiDaiLiProxy.get_proxies] statuc code not 200 and response.txt:{response.text}")
raise Exception("get ip error from proxy provider and status code not 200 ...")
ip_response: Dict = response.json()
if ip_response.get("code") != 0:
utils.logger.error(f"[KuaiDaiLiProxy.get_proxies] code not 0 and msg:{ip_response.get('msg')}")
raise Exception("get ip error from proxy provider and code not 0 ...")
proxy_list: List[str] = ip_response.get("data", {}).get("proxy_list")
for proxy in proxy_list:
proxy_model = parse_kuaidaili_proxy(proxy)
ip_info_model = IpInfoModel(
ip=proxy_model.ip,
port=proxy_model.port,
user=self.kdl_user_name,
password=self.kdl_user_pwd,
expired_time_ts=proxy_model.expire_ts,
)
ip_key = f"{self.proxy_brand_name}_{ip_info_model.ip}_{ip_info_model.port}"
self.ip_cache.set_ip(ip_key, ip_info_model.model_dump_json(), ex=ip_info_model.expired_time_ts)
ip_infos.append(ip_info_model)
return ip_cache_list + ip_infos
2024-04-05 02:44:05 +00:00
def new_kuai_daili_proxy() -> KuaiDaiLiProxy:
"""
构造快代理HTTP实例
Returns:
"""
2024-04-17 15:12:05 +00:00
return KuaiDaiLiProxy(
2024-04-17 15:19:44 +00:00
kdl_secret_id=os.getenv("kdl_secret_id", "你的快代理secert_id"),
kdl_signature=os.getenv("kdl_signature", "你的快代理签名"),
2024-04-17 15:12:05 +00:00
kdl_user_name=os.getenv("kdl_user_name", "你的快代理用户名"),
kdl_user_pwd=os.getenv("kdl_user_pwd", "你的快代理密码"),
)