我有一个循环,开头为for I,范围为(0,100)。正常情况下,它可以正常运行,但有时由于网络条件而出现故障。目前,我已经设置它,以便在失败时,它将继续在except子句中(继续到I的下一个数字)。
我是否可以将相同的数字重新分配给i,并再次运行失败的循环迭代?
我有一个循环,开头为for I,范围为(0,100)。正常情况下,它可以正常运行,但有时由于网络条件而出现故障。目前,我已经设置它,以便在失败时,它将继续在except子句中(继续到I的下一个数字)。
我是否可以将相同的数字重新分配给i,并再次运行失败的循环迭代?
当前回答
如果你想要一个没有嵌套循环和成功调用break的解决方案,你可以为任何可迭代对象开发一个快速的可检索包装。这里有一个我经常遇到的网络问题的例子——保存的身份验证过期。它的用法是这样的:
client = get_client()
smart_loop = retriable(list_of_values):
for value in smart_loop:
try:
client.do_something_with(value)
except ClientAuthExpired:
client = get_client()
smart_loop.retry()
continue
except NetworkTimeout:
smart_loop.retry()
continue
其他回答
Decorator是一个很好的方法。
from functools import wraps
import time
class retry:
def __init__(self, success=lambda r:True, times=3, delay=1, raiseexception=True, echo=True):
self.success = success
self.times = times
self.raiseexception = raiseexception
self.echo = echo
self.delay = delay
def retry(fun, *args, success=lambda r:True, times=3, delay=1, raiseexception=True, echo=True, **kwargs):
ex = Exception(f"{fun} failed.")
r = None
for i in range(times):
if i > 0:
time.sleep(delay*2**(i-1))
try:
r = fun(*args, **kwargs)
s = success(r)
except Exception as e:
s = False
ex = e
# raise e
if not s:
continue
return r
else:
if echo:
print(f"{fun} failed.", "args:", args, kwargs, "\nresult: %s"%r)
if raiseexception:
raise ex
def __call__(self, fun):
@wraps(fun)
def wraper(*args, retry=0, **kwargs):
retry = retry if retry>0 else self.times
return self.__class__.retry(fun, *args,
success=self.success,
times=retry,
delay=self.delay,
raiseexception = self.raiseexception,
echo = self.echo,
**kwargs)
return wraper
一些用法示例:
@retry(success=lambda x:x>3, times=4, delay=0.1)
def rf1(x=[]):
x.append(1)
print(x)
return len(x)
> rf1()
[1]
[1, 1]
[1, 1, 1]
[1, 1, 1, 1]
4
@retry(success=lambda x:x>3, times=4, delay=0.1)
def rf2(l=[], v=1):
l.append(v)
print(l)
assert len(l)>4
return len(l)
> rf2(v=2, retry=10) #overwite times=4
[2]
[2, 2]
[2, 2, 2]
[2, 2, 2, 2]
[2, 2, 2, 2, 2]
5
> retry.retry(lambda a,b:a+b, 1, 2, times=2)
3
> retry.retry(lambda a,b:a+b, 1, "2", times=2)
TypeError: unsupported operand type(s) for +: 'int' and 'str'
不使用那些丑陋的while循环的更“功能性”的方法:
def tryAgain(retries=0):
if retries > 10: return
try:
# Do stuff
except:
retries+=1
tryAgain(retries)
tryAgain()
以下是我对这个问题的看法。下面的重试功能支持以下特性:
当调用成功时返回被调用函数的值 如果尝试失败,则引发被调用函数的异常 尝试次数限制(0表示无限) 在尝试之间等待(线性或指数) 仅当异常是特定异常类型的实例时重试。 可选的尝试记录
import time
def retry(func, ex_type=Exception, limit=0, wait_ms=100, wait_increase_ratio=2, logger=None):
attempt = 1
while True:
try:
return func()
except Exception as ex:
if not isinstance(ex, ex_type):
raise ex
if 0 < limit <= attempt:
if logger:
logger.warning("no more attempts")
raise ex
if logger:
logger.error("failed execution attempt #%d", attempt, exc_info=ex)
attempt += 1
if logger:
logger.info("waiting %d ms before attempt #%d", wait_ms, attempt)
time.sleep(wait_ms / 1000)
wait_ms *= wait_increase_ratio
用法:
def fail_randomly():
y = random.randint(0, 10)
if y < 10:
y = 0
return x / y
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
logger.info("starting")
result = retry.retry(fail_randomly, ex_type=ZeroDivisionError, limit=20, logger=logger)
logger.info("result is: %s", result)
更多信息请看我的帖子。
更新2021-12-01:
自2016年6月起,不再维护重试包。 考虑使用活动的fork github.com/jd/tenacity,或者github.com/litl/backoff。
重试包是在失败时重试代码块的好方法。
例如:
@retry(wait_random_min=1000, wait_random_max=2000)
def wait_random_1_to_2_s():
print("Randomly wait 1 to 2 seconds between retries")
这里有一个与其他解决方案类似的解决方案,但是如果在规定的次数或重试次数内没有成功,它将引发异常。
tries = 3
for i in range(tries):
try:
do_the_thing()
except KeyError as e:
if i < tries - 1: # i is zero indexed
continue
else:
raise
break