我有一个循环,开头为for I,范围为(0,100)。正常情况下,它可以正常运行,但有时由于网络条件而出现故障。目前,我已经设置它,以便在失败时,它将继续在except子句中(继续到I的下一个数字)。

我是否可以将相同的数字重新分配给i,并再次运行失败的循环迭代?


当前回答

如果你想要一个没有嵌套循环和成功调用break的解决方案,你可以为任何可迭代对象开发一个快速的可检索包装。这里有一个我经常遇到的网络问题的例子——保存的身份验证过期。它的用法是这样的:

client = get_client()
smart_loop = retriable(list_of_values):

for value in smart_loop:
    try:
        client.do_something_with(value)
    except ClientAuthExpired:
        client = get_client()
        smart_loop.retry()
        continue
    except NetworkTimeout:
        smart_loop.retry()
        continue

其他回答

只有当try子句成功时才增加循环变量

Decorator是一个很好的方法。

from functools import wraps
import time

class retry:
    def __init__(self, success=lambda r:True, times=3, delay=1, raiseexception=True, echo=True):
        self.success = success
        self.times = times
        self.raiseexception = raiseexception
        self.echo = echo
        self.delay = delay
    def retry(fun, *args, success=lambda r:True, times=3, delay=1, raiseexception=True, echo=True, **kwargs):
        ex = Exception(f"{fun} failed.")
        r = None
        for i in range(times):
            if i > 0:
                time.sleep(delay*2**(i-1))
            try:
                r = fun(*args, **kwargs)
                s = success(r)
            except Exception as e:
                s = False
                ex = e
                # raise e
            if not s:
                continue
            return r
        else:
            if echo:
                print(f"{fun} failed.", "args:", args, kwargs, "\nresult: %s"%r)
            if raiseexception:
                raise ex
    def __call__(self, fun):
        @wraps(fun)
        def wraper(*args, retry=0, **kwargs):
            retry = retry if retry>0 else self.times
            return self.__class__.retry(fun, *args, 
                                        success=self.success, 
                                        times=retry,
                                        delay=self.delay,
                                        raiseexception = self.raiseexception,
                                        echo = self.echo,
                                        **kwargs)
        return wraper

一些用法示例:

@retry(success=lambda x:x>3, times=4, delay=0.1)
def rf1(x=[]):
    x.append(1)
    print(x)
    return len(x)
> rf1()

[1]
[1, 1]
[1, 1, 1]
[1, 1, 1, 1]

4
@retry(success=lambda x:x>3, times=4, delay=0.1)
def rf2(l=[], v=1):
    l.append(v)
    print(l)
    assert len(l)>4
    return len(l)
> rf2(v=2, retry=10) #overwite times=4

[2]
[2, 2]
[2, 2, 2]
[2, 2, 2, 2]
[2, 2, 2, 2, 2]

5
> retry.retry(lambda a,b:a+b, 1, 2, times=2)

3
> retry.retry(lambda a,b:a+b, 1, "2", times=2)

TypeError: unsupported operand type(s) for +: 'int' and 'str'

重新尝试的替代方案:坚韧和退缩(2020年更新)

重新尝试库是以前的方法,但遗憾的是,它有一些bug,自2016年以来就没有任何更新。其他的选择似乎是后退和坚韧。在写这篇文章的时候,tenacity有更多的GItHub星(2.3k vs 1.2k),并且最近更新了,因此我选择使用它。这里有一个例子:

from functools import partial
import random # producing random errors for this example

from tenacity import retry, stop_after_delay, wait_fixed, retry_if_exception_type

# Custom error type for this example
class CommunicationError(Exception):
    pass

# Define shorthand decorator for the used settings.
retry_on_communication_error = partial(
    retry,
    stop=stop_after_delay(10),  # max. 10 seconds wait.
    wait=wait_fixed(0.4),  # wait 400ms 
    retry=retry_if_exception_type(CommunicationError),
)()


@retry_on_communication_error
def do_something_unreliable(i):
    if random.randint(1, 5) == 3:
        print('Run#', i, 'Error occured. Retrying.')
        raise CommunicationError()

for i in range(100):
    do_something_unreliable(i)

上面的代码输出如下:

Run# 3 Error occured. Retrying.
Run# 5 Error occured. Retrying.
Run# 6 Error occured. Retrying.
Run# 6 Error occured. Retrying.
Run# 10 Error occured. Retrying.
.
.
.

坚韧的更多设置。坚韧GitHub页面上列出了重试。

我使用这个,它可以用于任何函数:

def run_with_retry(func: callable, max_retries: int = 3, wait_seconds: int = 2, **func_params):
num_retries = 1
while True:
    try:
        return func(*func_params.values())
    except Exception as e:
        if num_retries > max_retries:
            print('we have reached maximum errors and raising the exception')
            raise e
        else:
            print(f'{num_retries}/{max_retries}')
            print("Retrying error:", e)
            num_retries += 1
            sleep(wait_seconds)

像这样调用:

    def add(val1, val2):
        return val1 + val2

    run_with_retry(func=add, param1=10, param2=20)

更新2021-12-01:

自2016年6月起,不再维护重试包。 考虑使用活动的fork github.com/jd/tenacity,或者github.com/litl/backoff。


重试包是在失败时重试代码块的好方法。

例如:

@retry(wait_random_min=1000, wait_random_max=2000)
def wait_random_1_to_2_s():
    print("Randomly wait 1 to 2 seconds between retries")