我有一个循环,开头为for I,范围为(0,100)。正常情况下,它可以正常运行,但有时由于网络条件而出现故障。目前,我已经设置它,以便在失败时,它将继续在except子句中(继续到I的下一个数字)。
我是否可以将相同的数字重新分配给i,并再次运行失败的循环迭代?
我有一个循环,开头为for I,范围为(0,100)。正常情况下,它可以正常运行,但有时由于网络条件而出现故障。目前,我已经设置它,以便在失败时,它将继续在except子句中(继续到I的下一个数字)。
我是否可以将相同的数字重新分配给i,并再次运行失败的循环迭代?
当前回答
你可以有一个专门的函数使用返回短路结果。比如这样:
def my_function_with_retries(..., max_retries=100):
for attempt in range(max_retries):
try:
return my_function(...)
except SomeSpecificException as error:
logging.warning(f"Retrying after failed execution: {error}")
raise SomeOtherException()
其他回答
这里有一个快速装饰器来处理这个问题。7行,没有依赖关系。
def retry(exception=Exception, retries=3, delay=0):
def wrap(func):
for i in range(retries):
try:
return func()
except exception as e:
print(f'Retrying {func.__name__}: {i}/{retries}')
time.sleep(delay)
raise e
return wrap
@retry()
def do_something():
...
@retry(HTTPError, retries=100, delay=3)
def download_something():
...
可以添加的一个功能是扩展异常以处理多个异常(splat一个列表)。
只有当try子句成功时才增加循环变量
Decorator是一个很好的方法。
from functools import wraps
import time
class retry:
def __init__(self, success=lambda r:True, times=3, delay=1, raiseexception=True, echo=True):
self.success = success
self.times = times
self.raiseexception = raiseexception
self.echo = echo
self.delay = delay
def retry(fun, *args, success=lambda r:True, times=3, delay=1, raiseexception=True, echo=True, **kwargs):
ex = Exception(f"{fun} failed.")
r = None
for i in range(times):
if i > 0:
time.sleep(delay*2**(i-1))
try:
r = fun(*args, **kwargs)
s = success(r)
except Exception as e:
s = False
ex = e
# raise e
if not s:
continue
return r
else:
if echo:
print(f"{fun} failed.", "args:", args, kwargs, "\nresult: %s"%r)
if raiseexception:
raise ex
def __call__(self, fun):
@wraps(fun)
def wraper(*args, retry=0, **kwargs):
retry = retry if retry>0 else self.times
return self.__class__.retry(fun, *args,
success=self.success,
times=retry,
delay=self.delay,
raiseexception = self.raiseexception,
echo = self.echo,
**kwargs)
return wraper
一些用法示例:
@retry(success=lambda x:x>3, times=4, delay=0.1)
def rf1(x=[]):
x.append(1)
print(x)
return len(x)
> rf1()
[1]
[1, 1]
[1, 1, 1]
[1, 1, 1, 1]
4
@retry(success=lambda x:x>3, times=4, delay=0.1)
def rf2(l=[], v=1):
l.append(v)
print(l)
assert len(l)>4
return len(l)
> rf2(v=2, retry=10) #overwite times=4
[2]
[2, 2]
[2, 2, 2]
[2, 2, 2, 2]
[2, 2, 2, 2, 2]
5
> retry.retry(lambda a,b:a+b, 1, 2, times=2)
3
> retry.retry(lambda a,b:a+b, 1, "2", times=2)
TypeError: unsupported operand type(s) for +: 'int' and 'str'
我倾向于限制重试次数,这样如果某个特定项目出现问题,你就可以继续进行下一个项目,如下:
for i in range(100):
for attempt in range(10):
try:
# do thing
except:
# perhaps reconnect, etc.
else:
break
else:
# we failed all the attempts - deal with the consequences.
重新尝试的替代方案:坚韧和退缩(2020年更新)
重新尝试库是以前的方法,但遗憾的是,它有一些bug,自2016年以来就没有任何更新。其他的选择似乎是后退和坚韧。在写这篇文章的时候,tenacity有更多的GItHub星(2.3k vs 1.2k),并且最近更新了,因此我选择使用它。这里有一个例子:
from functools import partial
import random # producing random errors for this example
from tenacity import retry, stop_after_delay, wait_fixed, retry_if_exception_type
# Custom error type for this example
class CommunicationError(Exception):
pass
# Define shorthand decorator for the used settings.
retry_on_communication_error = partial(
retry,
stop=stop_after_delay(10), # max. 10 seconds wait.
wait=wait_fixed(0.4), # wait 400ms
retry=retry_if_exception_type(CommunicationError),
)()
@retry_on_communication_error
def do_something_unreliable(i):
if random.randint(1, 5) == 3:
print('Run#', i, 'Error occured. Retrying.')
raise CommunicationError()
for i in range(100):
do_something_unreliable(i)
上面的代码输出如下:
Run# 3 Error occured. Retrying.
Run# 5 Error occured. Retrying.
Run# 6 Error occured. Retrying.
Run# 6 Error occured. Retrying.
Run# 10 Error occured. Retrying.
.
.
.
坚韧的更多设置。坚韧GitHub页面上列出了重试。