考虑以下几点:
@property
def name(self):
if not hasattr(self, '_name'):
# expensive calculation
self._name = 1 + 1
return self._name
我是新来的,但我认为缓存可以分解成一个装饰器。只是我没有找到一个这样的;)
PS,真正的计算不依赖于可变值
考虑以下几点:
@property
def name(self):
if not hasattr(self, '_name'):
# expensive calculation
self._name = 1 + 1
return self._name
我是新来的,但我认为缓存可以分解成一个装饰器。只是我没有找到一个这样的;)
PS,真正的计算不依赖于可变值
当前回答
from functools import wraps
def cache(maxsize=128):
cache = {}
def decorator(func):
@wraps(func)
def inner(*args, no_cache=False, **kwargs):
if no_cache:
return func(*args, **kwargs)
key_base = "_".join(str(x) for x in args)
key_end = "_".join(f"{k}:{v}" for k, v in kwargs.items())
key = f"{key_base}-{key_end}"
if key in cache:
return cache[key]
res = func(*args, **kwargs)
if len(cache) > maxsize:
del cache[list(cache.keys())[0]]
cache[key] = res
return res
return inner
return decorator
def async_cache(maxsize=128):
cache = {}
def decorator(func):
@wraps(func)
async def inner(*args, no_cache=False, **kwargs):
if no_cache:
return await func(*args, **kwargs)
key_base = "_".join(str(x) for x in args)
key_end = "_".join(f"{k}:{v}" for k, v in kwargs.items())
key = f"{key_base}-{key_end}"
if key in cache:
return cache[key]
res = await func(*args, **kwargs)
if len(cache) > maxsize:
del cache[list(cache.keys())[0]]
cache[key] = res
return res
return inner
return decorator
示例使用
import asyncio
import aiohttp
# Removes the aiohttp ClientSession instance warning.
class HTTPSession(aiohttp.ClientSession):
""" Abstract class for aiohttp. """
def __init__(self, loop=None) -> None:
super().__init__(loop=loop or asyncio.get_event_loop())
def __del__(self) -> None:
if not self.closed:
self.loop.run_until_complete(self.close())
self.loop.close()
return
session = HTTPSession()
@async_cache()
async def query(url, method="get", res_method="text", *args, **kwargs):
async with getattr(session, method.lower())(url, *args, **kwargs) as res:
return await getattr(res, res_method)()
async def get(url, *args, **kwargs):
return await query(url, "get", *args, **kwargs)
async def post(url, *args, **kwargs):
return await query(url, "post", *args, **kwargs)
async def delete(url, *args, **kwargs):
return await query(url, "delete", *args, **kwargs)
其他回答
尝试joblib https://joblib.readthedocs.io/en/latest/memory.html
from joblib import Memory
memory = Memory(cachedir=cachedir, verbose=0)
@memory.cache
def f(x):
print('Running f(%s)' % x)
return x
听起来好像您不是在要求一个通用的记忆化装饰器(也就是说,您对想要缓存不同参数值的返回值的一般情况不感兴趣)。也就是说,你想要这样:
x = obj.name # expensive
y = obj.name # cheap
而一个通用的记忆装饰器会给你这样的:
x = obj.name() # expensive
y = obj.name() # cheap
我认为方法调用语法是更好的风格,因为它暗示了昂贵计算的可能性,而属性语法暗示了快速查找。
[更新:我之前链接并引用的基于类的记忆化装饰器不适用于方法。我用decorator函数替换了它。如果你愿意使用通用的记忆装饰器,这里有一个简单的:
def memoize(function):
memo = {}
def wrapper(*args):
if args in memo:
return memo[args]
else:
rv = function(*args)
memo[args] = rv
return rv
return wrapper
使用示例:
@memoize
def fibonacci(n):
if n < 2: return n
return fibonacci(n - 1) + fibonacci(n - 2)
可以在这里找到另一个对缓存大小有限制的内存装饰器。
@lru_cache不适合默认attrs
我的@mem装饰:
import inspect
from copy import deepcopy
from functools import lru_cache, wraps
from typing import Any, Callable, Dict, Iterable
# helper
def get_all_kwargs_values(f: Callable, kwargs: Dict[str, Any]) -> Iterable[Any]:
default_kwargs = {
k: v.default
for k, v in inspect.signature(f).parameters.items()
if v.default is not inspect.Parameter.empty
}
all_kwargs = deepcopy(default_kwargs)
all_kwargs.update(kwargs)
for key in sorted(all_kwargs.keys()):
yield all_kwargs[key]
# the best decorator
def mem(func: Callable) -> Callable:
cache = dict()
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
all_kwargs_values = get_all_kwargs_values(func, kwargs)
params = (*args, *all_kwargs_values)
_hash = hash(params)
if _hash not in cache:
cache[_hash] = func(*args, **kwargs)
return cache[_hash]
return wrapper
# some logic
def counter(*args) -> int:
print(f'* not_cached:', end='\t')
return sum(args)
@mem
def check_mem(a, *args, z=10) -> int:
return counter(a, *args, z)
@lru_cache
def check_lru(a, *args, z=10) -> int:
return counter(a, *args, z)
def test(func) -> None:
print(f'\nTest {func.__name__}:')
print('*', func(1, 2, 3, 4, 5))
print('*', func(1, 2, 3, 4, 5))
print('*', func(1, 2, 3, 4, 5, z=6))
print('*', func(1, 2, 3, 4, 5, z=6))
print('*', func(1))
print('*', func(1, z=10))
def main():
test(check_mem)
test(check_lru)
if __name__ == '__main__':
main()
输出:
Test check_mem:
* not_cached: * 25
* 25
* not_cached: * 21
* 21
* not_cached: * 11
* 11
Test check_lru:
* not_cached: * 25
* 25
* not_cached: * 21
* 21
* not_cached: * 11
* not_cached: * 11
functools。缓存已经在Python 3.9 (docs)中发布:
from functools import cache
@cache
def factorial(n):
return n * factorial(n-1) if n else 1
在以前的Python版本中,早期的答案之一仍然是有效的解决方案:使用lru_cache作为普通缓存,没有限制和lru特性。(文档)
如果maxsize设置为None,将禁用LRU特性,并将缓存 可以不受束缚地成长。
这里有一个更漂亮的版本:
cache = lru_cache(maxsize=None)
@cache
def func(param1):
pass
from functools import wraps
def cache(maxsize=128):
cache = {}
def decorator(func):
@wraps(func)
def inner(*args, no_cache=False, **kwargs):
if no_cache:
return func(*args, **kwargs)
key_base = "_".join(str(x) for x in args)
key_end = "_".join(f"{k}:{v}" for k, v in kwargs.items())
key = f"{key_base}-{key_end}"
if key in cache:
return cache[key]
res = func(*args, **kwargs)
if len(cache) > maxsize:
del cache[list(cache.keys())[0]]
cache[key] = res
return res
return inner
return decorator
def async_cache(maxsize=128):
cache = {}
def decorator(func):
@wraps(func)
async def inner(*args, no_cache=False, **kwargs):
if no_cache:
return await func(*args, **kwargs)
key_base = "_".join(str(x) for x in args)
key_end = "_".join(f"{k}:{v}" for k, v in kwargs.items())
key = f"{key_base}-{key_end}"
if key in cache:
return cache[key]
res = await func(*args, **kwargs)
if len(cache) > maxsize:
del cache[list(cache.keys())[0]]
cache[key] = res
return res
return inner
return decorator
示例使用
import asyncio
import aiohttp
# Removes the aiohttp ClientSession instance warning.
class HTTPSession(aiohttp.ClientSession):
""" Abstract class for aiohttp. """
def __init__(self, loop=None) -> None:
super().__init__(loop=loop or asyncio.get_event_loop())
def __del__(self) -> None:
if not self.closed:
self.loop.run_until_complete(self.close())
self.loop.close()
return
session = HTTPSession()
@async_cache()
async def query(url, method="get", res_method="text", *args, **kwargs):
async with getattr(session, method.lower())(url, *args, **kwargs) as res:
return await getattr(res, res_method)()
async def get(url, *args, **kwargs):
return await query(url, "get", *args, **kwargs)
async def post(url, *args, **kwargs):
return await query(url, "post", *args, **kwargs)
async def delete(url, *args, **kwargs):
return await query(url, "delete", *args, **kwargs)