# -*- coding: utf-8 -*- # @Author : relakkes@gmail.com # @Name : 程序员阿江-Relakkes # @Time : 2024/6/2 11:05 # @Desc : 本地缓存 import asyncio import time from typing import Any, Dict, Optional, Tuple from abs_cache import Cache class ExpiringLocalCache(Cache): def __init__(self, cron_interval: int = 10): """ 初始化本地缓存 :param cron_interval: 定时清楚cache的时间间隔 :return: """ self._cron_interval = cron_interval self._cache_container: Dict[str, Tuple[Any, float]] = {} self._cron_task: Optional[asyncio.Task] = None # 开启定时清理任务 self._schedule_clear() def __del__(self): """ 析构函数,清理定时任务 :return: """ if self._cron_task is not None: self._cron_task.cancel() def get(self, key: str) -> Optional[Any]: """ 从缓存中获取键的值 :param key: :return: """ value, expire_time = self._cache_container.get(key, (None, 0)) if value is None: return None # 如果键已过期,则删除键并返回None if expire_time < time.time(): del self._cache_container[key] return None return value def set(self, key: str, value: Any, expire_time: int) -> None: """ 将键的值设置到缓存中 :param key: :param value: :param expire_time: :return: """ self._cache_container[key] = (value, time.time() + expire_time) def _schedule_clear(self): """ 开启定时清理任务, :return: """ try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) self._cron_task = loop.create_task(self._start_clear_cron()) def _clear(self): """ 根据过期时间清理缓存 :return: """ for key, (value, expire_time) in self._cache_container.items(): if expire_time < time.time(): del self._cache_container[key] async def _start_clear_cron(self): """ 开启定时清理任务 :return: """ while True: self._clear() await asyncio.sleep(self._cron_interval) if __name__ == '__main__': cache = ExpiringLocalCache(cron_interval=2) cache.set('name', '程序员阿江-Relakkes', 3) print(cache.get('key')) time.sleep(4) print(cache.get('key')) del cache time.sleep(1) print("done")