MediaCrawler/proxy/proxy_account_pool.py

138 lines
3.6 KiB
Python
Raw Normal View History

2023-12-02 08:14:36 +00:00
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 11:18
# @Desc : IP 和 手机号 一一配对的账号代理池
2023-07-29 07:35:40 +00:00
from typing import List, Optional, Set, Tuple
import config
class PhonePool:
"""phone pool class"""
def __init__(self) -> None:
self.phones: List[str] = []
self.used_phones: Set[str] = set()
def add_phone(self, phone: str) -> bool:
"""add phone to the pool"""
if phone not in self.phones:
self.phones.append(phone)
return True
return False
def remove_phone(self, phone: str) -> bool:
"""remove phone from the pool"""
if phone in self.used_phones:
self.phones.remove(phone)
self.used_phones.remove(phone)
return True
return False
def get_phone(self) -> Optional[str]:
"""get phone and mark as used"""
if self.phones:
left_phone = self.phones.pop(0)
self.used_phones.add(left_phone)
return left_phone
return None
def clear(self):
"""clear phone pool"""
self.phones = []
self.used_phones = set()
class IPPool:
def __init__(self) -> None:
self.ips: List[str]= []
self.used_ips: Set[str]= set()
def add_ip(self, ip):
"""添加ip"""
if ip not in self.ips:
self.ips.append(ip)
return True
return False
def remove_ip(self, ip: str) -> bool:
"""remove ip"""
if ip in self.used_ips:
self.ips.remove(ip)
self.used_ips.remove(ip)
return True
return False
def get_ip(self) -> Optional[str]:
"""get ip and mark as used"""
if self.ips:
left_ips = self.ips.pop(0)
self.used_ips.add(left_ips)
return left_ips
return None
def clear(self):
""" clear ip pool"""
self.ips = []
self.used_ips = set()
class AccountPool:
"""account pool class"""
def __init__(self):
self.phone_pool = PhonePool()
self.ip_pool = IPPool()
def add_account(self, phone: str, ip: str) -> bool:
"""add account to pool with phone and ip"""
if self.phone_pool.add_phone(phone) and self.ip_pool.add_ip(ip):
return True
return False
def remove_account(self, phone: str, ip: str) -> bool:
"""remove account from pool """
if self.phone_pool.remove_phone(phone) and self.ip_pool.remove_ip(ip):
return True
return False
def get_account(self) -> Tuple[str, str]:
"""get account if no account, reload account pool"""
phone = self.phone_pool.get_phone()
ip = self.ip_pool.get_ip()
if not phone or not ip:
reload_account_pool(self)
return self.get_account()
return phone, ip
def clear_account(self):
"""clear account pool"""
self.phone_pool.clear()
self.ip_pool.clear()
def reload_account_pool(apo: AccountPool):
"""reload account pool"""
apo.clear_account()
for phone, ip in zip(config.PHONE_LIST, config.IP_PROXY_LIST):
apo.add_account(phone, ip)
def create_account_pool() -> AccountPool:
"""create account pool"""
apo = AccountPool()
reload_account_pool(apo=apo)
return apo
if __name__ == '__main__':
import time
ac_pool = create_account_pool()
p, i = ac_pool.get_account()
while p:
print(f"get phone:{p}, ip proxy:{i} from account pool")
p, i = ac_pool.get_account()
time.sleep(1)