我有一个循环,开头为for I,范围为(0,100)。正常情况下,它可以正常运行,但有时由于网络条件而出现故障。目前,我已经设置它,以便在失败时,它将继续在except子句中(继续到I的下一个数字)。
我是否可以将相同的数字重新分配给i,并再次运行失败的循环迭代?
我有一个循环,开头为for I,范围为(0,100)。正常情况下,它可以正常运行,但有时由于网络条件而出现故障。目前,我已经设置它,以便在失败时,它将继续在except子句中(继续到I的下一个数字)。
我是否可以将相同的数字重新分配给i,并再次运行失败的循环迭代?
当前回答
如果你想要一个没有嵌套循环和成功调用break的解决方案,你可以为任何可迭代对象开发一个快速的可检索包装。这里有一个我经常遇到的网络问题的例子——保存的身份验证过期。它的用法是这样的:
client = get_client()
smart_loop = retriable(list_of_values):
for value in smart_loop:
try:
client.do_something_with(value)
except ClientAuthExpired:
client = get_client()
smart_loop.retry()
continue
except NetworkTimeout:
smart_loop.retry()
continue
其他回答
只有当try子句成功时才增加循环变量
Decorator是一个很好的方法。
from functools import wraps
import time
class retry:
def __init__(self, success=lambda r:True, times=3, delay=1, raiseexception=True, echo=True):
self.success = success
self.times = times
self.raiseexception = raiseexception
self.echo = echo
self.delay = delay
def retry(fun, *args, success=lambda r:True, times=3, delay=1, raiseexception=True, echo=True, **kwargs):
ex = Exception(f"{fun} failed.")
r = None
for i in range(times):
if i > 0:
time.sleep(delay*2**(i-1))
try:
r = fun(*args, **kwargs)
s = success(r)
except Exception as e:
s = False
ex = e
# raise e
if not s:
continue
return r
else:
if echo:
print(f"{fun} failed.", "args:", args, kwargs, "\nresult: %s"%r)
if raiseexception:
raise ex
def __call__(self, fun):
@wraps(fun)
def wraper(*args, retry=0, **kwargs):
retry = retry if retry>0 else self.times
return self.__class__.retry(fun, *args,
success=self.success,
times=retry,
delay=self.delay,
raiseexception = self.raiseexception,
echo = self.echo,
**kwargs)
return wraper
一些用法示例:
@retry(success=lambda x:x>3, times=4, delay=0.1)
def rf1(x=[]):
x.append(1)
print(x)
return len(x)
> rf1()
[1]
[1, 1]
[1, 1, 1]
[1, 1, 1, 1]
4
@retry(success=lambda x:x>3, times=4, delay=0.1)
def rf2(l=[], v=1):
l.append(v)
print(l)
assert len(l)>4
return len(l)
> rf2(v=2, retry=10) #overwite times=4
[2]
[2, 2]
[2, 2, 2]
[2, 2, 2, 2]
[2, 2, 2, 2, 2]
5
> retry.retry(lambda a,b:a+b, 1, 2, times=2)
3
> retry.retry(lambda a,b:a+b, 1, "2", times=2)
TypeError: unsupported operand type(s) for +: 'int' and 'str'
重新尝试的替代方案:坚韧和退缩(2020年更新)
重新尝试库是以前的方法,但遗憾的是,它有一些bug,自2016年以来就没有任何更新。其他的选择似乎是后退和坚韧。在写这篇文章的时候,tenacity有更多的GItHub星(2.3k vs 1.2k),并且最近更新了,因此我选择使用它。这里有一个例子:
from functools import partial
import random # producing random errors for this example
from tenacity import retry, stop_after_delay, wait_fixed, retry_if_exception_type
# Custom error type for this example
class CommunicationError(Exception):
pass
# Define shorthand decorator for the used settings.
retry_on_communication_error = partial(
retry,
stop=stop_after_delay(10), # max. 10 seconds wait.
wait=wait_fixed(0.4), # wait 400ms
retry=retry_if_exception_type(CommunicationError),
)()
@retry_on_communication_error
def do_something_unreliable(i):
if random.randint(1, 5) == 3:
print('Run#', i, 'Error occured. Retrying.')
raise CommunicationError()
for i in range(100):
do_something_unreliable(i)
上面的代码输出如下:
Run# 3 Error occured. Retrying.
Run# 5 Error occured. Retrying.
Run# 6 Error occured. Retrying.
Run# 6 Error occured. Retrying.
Run# 10 Error occured. Retrying.
.
.
.
坚韧的更多设置。坚韧GitHub页面上列出了重试。
我使用这个,它可以用于任何函数:
def run_with_retry(func: callable, max_retries: int = 3, wait_seconds: int = 2, **func_params):
num_retries = 1
while True:
try:
return func(*func_params.values())
except Exception as e:
if num_retries > max_retries:
print('we have reached maximum errors and raising the exception')
raise e
else:
print(f'{num_retries}/{max_retries}')
print("Retrying error:", e)
num_retries += 1
sleep(wait_seconds)
像这样调用:
def add(val1, val2):
return val1 + val2
run_with_retry(func=add, param1=10, param2=20)
更新2021-12-01:
自2016年6月起,不再维护重试包。 考虑使用活动的fork github.com/jd/tenacity,或者github.com/litl/backoff。
重试包是在失败时重试代码块的好方法。
例如:
@retry(wait_random_min=1000, wait_random_max=2000)
def wait_random_1_to_2_s():
print("Randomly wait 1 to 2 seconds between retries")