我有一个循环,开头为for I,范围为(0,100)。正常情况下,它可以正常运行,但有时由于网络条件而出现故障。目前,我已经设置它,以便在失败时,它将继续在except子句中(继续到I的下一个数字)。
我是否可以将相同的数字重新分配给i,并再次运行失败的循环迭代?
我有一个循环,开头为for I,范围为(0,100)。正常情况下,它可以正常运行,但有时由于网络条件而出现故障。目前,我已经设置它,以便在失败时,它将继续在except子句中(继续到I的下一个数字)。
我是否可以将相同的数字重新分配给i,并再次运行失败的循环迭代?
当前回答
在Python装饰器库中也有类似的东西。
请记住,它不测试异常,而是测试返回值。它会重新尝试,直到被修饰的函数返回True。
稍微修改一下版本就可以了。
其他回答
我喜欢laurent-laporte的回答。下面是我的版本,它包装在一个类与静态方法和一些例子。我实现了重试计数作为另一种重试方式。还增加了kwargs。
from typing import List
import time
class Retry:
@staticmethod
def onerror_retry(exception, callback, retries: int = 0, timeout: float = 0, timedelta: float = 0,
errors: List = None, **kwargs):
"""
@param exception: The exception to trigger retry handling with.
@param callback: The function that will potentially fail with an exception
@param retries: Optional total number of retries, regardless of timing if this threshold is met, the call will
raise the exception.
@param timeout: Optional total amount of time to do retries after which the call will raise an exception
@param timedelta: Optional amount of time to sleep in between calls
@param errors: A list to receive all the exceptions that were caught.
@param kwargs: An optional key value parameters to pass to the function to retry.
"""
for retry in Retry.__onerror_retry(exception, callback, retries, timeout, timedelta, errors, **kwargs):
if retry: retry(**kwargs) # retry will be None when all retries fail.
@staticmethod
def __onerror_retry(exception, callback, retries: int = 0, timeout: float = 0, timedelta: float = 0,
errors: List = None, **kwargs):
end_time = time.time() + timeout
continues = 0
while True:
try:
yield callback(**kwargs)
break
except exception as ex:
print(ex)
if errors:
errors.append(ex)
continues += 1
if 0 < retries < continues:
print('ran out of retries')
raise
if timeout > 0 and time.time() > end_time:
print('ran out of time')
raise
elif timedelta > 0:
time.sleep(timedelta)
err = 0
#
# sample dumb fail function
def fail_many_times(**kwargs):
global err
err += 1
max_errors = kwargs.pop('max_errors', '') or 1
if err < max_errors:
raise ValueError("I made boo boo.")
print("Successfully did something.")
#
# Example calls
try:
#
# retries with a parameter that overrides retries... just because
Retry.onerror_retry(ValueError, fail_many_times, retries=5, max_errors=3)
err = 0
#
# retries that run out of time, with 1 second sleep between retries.
Retry.onerror_retry(ValueError, fail_many_times, timeout=5, timedelta=1, max_errors=30)
except Exception as err:
print(err)
使用这个装饰器,您可以轻松地控制错误
class catch:
def __init__(self, max=1, callback=None):
self.max = max
self.callback = callback
def set_max(self, max):
self.max = max
def handler(self, *args, **kwargs):
self.index = 0
while self.index < self.max:
self.index += 1
try:
self.func(self, *args, **kwargs)
except Exception as error:
if callable(self.callback):
self.callback(self, error, args, kwargs)
def __call__(self, func):
self.func = func
return self.handler
import time
def callback(cls, error, args, kwargs):
print('func args', args, 'func kwargs', kwargs)
print('error', repr(error), 'trying', cls.index)
if cls.index == 2:
cls.set_max(4)
else:
time.sleep(1)
@catch(max=2, callback=callback)
def test(cls, ok, **kwargs):
raise ValueError('ok')
test(1, message='hello')
只有当try子句成功时才增加循环变量
如果你想要一个没有嵌套循环和成功调用break的解决方案,你可以为任何可迭代对象开发一个快速的可检索包装。这里有一个我经常遇到的网络问题的例子——保存的身份验证过期。它的用法是这样的:
client = get_client()
smart_loop = retriable(list_of_values):
for value in smart_loop:
try:
client.do_something_with(value)
except ClientAuthExpired:
client = get_client()
smart_loop.retry()
continue
except NetworkTimeout:
smart_loop.retry()
continue
你可以有一个专门的函数使用返回短路结果。比如这样:
def my_function_with_retries(..., max_retries=100):
for attempt in range(max_retries):
try:
return my_function(...)
except SomeSpecificException as error:
logging.warning(f"Retrying after failed execution: {error}")
raise SomeOtherException()