我有一个循环,开头为for I,范围为(0,100)。正常情况下,它可以正常运行,但有时由于网络条件而出现故障。目前,我已经设置它,以便在失败时,它将继续在except子句中(继续到I的下一个数字)。
我是否可以将相同的数字重新分配给i,并再次运行失败的循环迭代?
我有一个循环,开头为for I,范围为(0,100)。正常情况下,它可以正常运行,但有时由于网络条件而出现故障。目前,我已经设置它,以便在失败时,它将继续在except子句中(继续到I的下一个数字)。
我是否可以将相同的数字重新分配给i,并再次运行失败的循环迭代?
当前回答
使用while和计数器:
count = 1
while count <= 3: # try 3 times
try:
# do_the_logic()
break
except SomeSpecificException as e:
# If trying 3rd time and still error??
# Just throw the error- we don't have anything to hide :)
if count == 3:
raise
count += 1
其他回答
以下是我对这个问题的看法。下面的重试功能支持以下特性:
当调用成功时返回被调用函数的值 如果尝试失败,则引发被调用函数的异常 尝试次数限制(0表示无限) 在尝试之间等待(线性或指数) 仅当异常是特定异常类型的实例时重试。 可选的尝试记录
import time
def retry(func, ex_type=Exception, limit=0, wait_ms=100, wait_increase_ratio=2, logger=None):
attempt = 1
while True:
try:
return func()
except Exception as ex:
if not isinstance(ex, ex_type):
raise ex
if 0 < limit <= attempt:
if logger:
logger.warning("no more attempts")
raise ex
if logger:
logger.error("failed execution attempt #%d", attempt, exc_info=ex)
attempt += 1
if logger:
logger.info("waiting %d ms before attempt #%d", wait_ms, attempt)
time.sleep(wait_ms / 1000)
wait_ms *= wait_increase_ratio
用法:
def fail_randomly():
y = random.randint(0, 10)
if y < 10:
y = 0
return x / y
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
logger.info("starting")
result = retry.retry(fail_randomly, ex_type=ZeroDivisionError, limit=20, logger=logger)
logger.info("result is: %s", result)
更多信息请看我的帖子。
我倾向于限制重试次数,这样如果某个特定项目出现问题,你就可以继续进行下一个项目,如下:
for i in range(100):
for attempt in range(10):
try:
# do thing
except:
# perhaps reconnect, etc.
else:
break
else:
# we failed all the attempts - deal with the consequences.
使用这个装饰器,您可以轻松地控制错误
class catch:
def __init__(self, max=1, callback=None):
self.max = max
self.callback = callback
def set_max(self, max):
self.max = max
def handler(self, *args, **kwargs):
self.index = 0
while self.index < self.max:
self.index += 1
try:
self.func(self, *args, **kwargs)
except Exception as error:
if callable(self.callback):
self.callback(self, error, args, kwargs)
def __call__(self, func):
self.func = func
return self.handler
import time
def callback(cls, error, args, kwargs):
print('func args', args, 'func kwargs', kwargs)
print('error', repr(error), 'trying', cls.index)
if cls.index == 2:
cls.set_max(4)
else:
time.sleep(1)
@catch(max=2, callback=callback)
def test(cls, ok, **kwargs):
raise ValueError('ok')
test(1, message='hello')
使用递归
for i in range(100):
def do():
try:
## Network related scripts
except SpecificException as ex:
do()
do() ## invoke do() whenever required inside this loop
如果你想要一个没有嵌套循环和成功调用break的解决方案,你可以为任何可迭代对象开发一个快速的可检索包装。这里有一个我经常遇到的网络问题的例子——保存的身份验证过期。它的用法是这样的:
client = get_client()
smart_loop = retriable(list_of_values):
for value in smart_loop:
try:
client.do_something_with(value)
except ClientAuthExpired:
client = get_client()
smart_loop.retry()
continue
except NetworkTimeout:
smart_loop.retry()
continue